日志类
This commit is contained in:
parent
4b9c846a7a
commit
8ab2eb2d4c
@ -0,0 +1,14 @@
|
|||||||
|
package cd.casic.ci.process.engine.manager;
|
||||||
|
|
||||||
|
import jakarta.servlet.http.HttpServletRequest;
|
||||||
|
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* TODO 用来管理日志连接
|
||||||
|
* TODO 用来写入日志
|
||||||
|
* */
|
||||||
|
public interface LoggerManager {
|
||||||
|
SseEmitter subscribe(String taskId,HttpServletRequest request);
|
||||||
|
void append(String taskId,String logContent);
|
||||||
|
String getLogContent(String taskId);
|
||||||
|
}
|
@ -0,0 +1,118 @@
|
|||||||
|
package cd.casic.ci.process.engine.manager.impl;
|
||||||
|
|
||||||
|
import cd.casic.ci.process.engine.manager.LoggerManager;
|
||||||
|
import cd.casic.ci.process.process.dal.pipeline.PipTaskLogDao;
|
||||||
|
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
|
||||||
|
import cd.casic.framework.commons.util.network.IpUtil;
|
||||||
|
import cd.casic.framework.commons.util.util.WebFrameworkUtils;
|
||||||
|
import jakarta.annotation.Resource;
|
||||||
|
import jakarta.servlet.http.HttpServletRequest;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
/**
|
||||||
|
* 流水线运行时日志管理
|
||||||
|
* */
|
||||||
|
@Service
|
||||||
|
@Slf4j
|
||||||
|
public class MemoryLogManager implements LoggerManager {
|
||||||
|
/**
|
||||||
|
* 第一级 是taskId,可能同一个账号、IP、建立多个连接所以保存为list
|
||||||
|
* */
|
||||||
|
private final static Map<String, List<SseEmitter>> taskIdSSEMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private final static Map<String,StringBuffer> taskIdMemoryLogMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
public static final Integer FLUSH_DB_SIZE=2*1024*1024;
|
||||||
|
/**
|
||||||
|
* 缓存最近一次执行的日志,key是taskId,val是数据库id(已入库的情况下)用于buffer满了增加日志内容
|
||||||
|
* 读取日志的时候同时读取数据库和内存中的日志
|
||||||
|
* */
|
||||||
|
private final static Map<String,String> taskIdDbMap = new ConcurrentHashMap<>();
|
||||||
|
@Resource
|
||||||
|
private PipTaskLogDao logDao;
|
||||||
|
@Override
|
||||||
|
public SseEmitter subscribe(String taskId,HttpServletRequest request) {
|
||||||
|
String ipAddr = IpUtil.getIpAddr(request);
|
||||||
|
SseEmitter emitter = new SseEmitter(3000L);
|
||||||
|
emitterInit(emitter,taskId);
|
||||||
|
Long loginUserId = WebFrameworkUtils.getLoginUserId();
|
||||||
|
log.info("SSE连接建立");
|
||||||
|
log.info("当前请求ip:{}",ipAddr);
|
||||||
|
log.info("当前用户id:{}",loginUserId);
|
||||||
|
log.info("当前taskId:{}",loginUserId);
|
||||||
|
List<SseEmitter> taskIdSSEList = taskIdSSEMap.getOrDefault(taskId, new ArrayList<>(1));
|
||||||
|
taskIdSSEList.add(emitter);
|
||||||
|
return emitter;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void append(String taskId, String logContent) {
|
||||||
|
/**
|
||||||
|
* 先往内存里写
|
||||||
|
* 内存满4mb入库,查询日志和入库操作用同一把锁
|
||||||
|
* 然后新内容推sse
|
||||||
|
* */
|
||||||
|
StringBuffer logCache = taskIdMemoryLogMap.getOrDefault(taskId, new StringBuffer());
|
||||||
|
int length = logCache.length();
|
||||||
|
if (length>=FLUSH_DB_SIZE) {
|
||||||
|
synchronized (this){
|
||||||
|
if (length>=FLUSH_DB_SIZE) {
|
||||||
|
StringBuffer stringBuffer = new StringBuffer();
|
||||||
|
// 清空缓存
|
||||||
|
taskIdMemoryLogMap.put(taskId, stringBuffer);
|
||||||
|
// 入库或者更新
|
||||||
|
if (taskIdDbMap.containsKey(taskId)) {
|
||||||
|
// 之前已经入库过
|
||||||
|
// 存在则更新
|
||||||
|
String id = taskIdDbMap.get(taskId);
|
||||||
|
PipTaskLog pipTaskLog = logDao.selectById(id);
|
||||||
|
pipTaskLog.setContent(pipTaskLog.getContent()+logCache.toString());
|
||||||
|
logDao.updateById(pipTaskLog);
|
||||||
|
} else {
|
||||||
|
// 不存在就新增
|
||||||
|
PipTaskLog pipTaskLog = new PipTaskLog();
|
||||||
|
pipTaskLog.setTaskId(taskId);
|
||||||
|
pipTaskLog.setContent(logCache.toString());
|
||||||
|
logDao.insert(pipTaskLog);
|
||||||
|
taskIdDbMap.put(taskId,pipTaskLog.getId());
|
||||||
|
}
|
||||||
|
logCache = stringBuffer;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logCache.append(logContent);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getLogContent(String taskId) {
|
||||||
|
StringBuffer logCache = taskIdMemoryLogMap.getOrDefault(taskId, new StringBuffer());
|
||||||
|
String id = taskIdDbMap.get(taskId);
|
||||||
|
if (id != null) {
|
||||||
|
PipTaskLog pipTaskLog = logDao.selectById(id);
|
||||||
|
return pipTaskLog.getContent()+logCache.toString();
|
||||||
|
}
|
||||||
|
return logCache.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void emitterInit(SseEmitter emitter,String taskId){
|
||||||
|
emitter.onCompletion(()->{
|
||||||
|
taskIdSSEMap.remove(taskId);
|
||||||
|
log.info("===================taskId:{}断开连接===============",taskId);
|
||||||
|
});
|
||||||
|
emitter.onError((e)->{
|
||||||
|
taskIdSSEMap.remove(taskId);
|
||||||
|
log.error("===================错误,taskId:{}断开连接===============",taskId,e);
|
||||||
|
});
|
||||||
|
emitter.onTimeout(()->{
|
||||||
|
taskIdSSEMap.remove(taskId);
|
||||||
|
log.error("===================超时,taskId:{}断开连接===============",taskId);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,7 @@
|
|||||||
|
package cd.casic.ci.process.process.dal.pipeline;
|
||||||
|
|
||||||
|
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
|
||||||
|
import cd.casic.framework.mybatis.core.mapper.BaseMapperX;
|
||||||
|
|
||||||
|
public interface PipTaskLogDao extends BaseMapperX<PipTaskLog> {
|
||||||
|
}
|
@ -0,0 +1,25 @@
|
|||||||
|
package cd.casic.server.controller;
|
||||||
|
|
||||||
|
import cd.casic.ci.process.engine.manager.LoggerManager;
|
||||||
|
import cd.casic.framework.commons.util.network.IpUtil;
|
||||||
|
import cd.casic.framework.commons.util.util.WebFrameworkUtils;
|
||||||
|
import jakarta.annotation.Resource;
|
||||||
|
import jakarta.servlet.http.HttpServletRequest;
|
||||||
|
import org.springframework.web.bind.annotation.GetMapping;
|
||||||
|
import org.springframework.web.bind.annotation.PathVariable;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@RestController
|
||||||
|
@RequestMapping("/sse")
|
||||||
|
public class SSEController {
|
||||||
|
@Resource
|
||||||
|
private LoggerManager loggerManager;
|
||||||
|
@GetMapping("/subscribe/log/{taskId}")
|
||||||
|
public SseEmitter log(HttpServletRequest request,@PathVariable String taskId){
|
||||||
|
return loggerManager.subscribe(taskId, request);
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user