Compare commits

...

2 Commits

Author SHA1 Message Date
even
54377af6df 日志修改 2025-05-26 19:35:25 +08:00
even
8ab2eb2d4c 日志类 2025-05-26 19:33:02 +08:00
7 changed files with 172 additions and 11 deletions

View File

@ -0,0 +1,14 @@
package cd.casic.ci.process.engine.manager;
import jakarta.servlet.http.HttpServletRequest;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
/**
* TODO 用来管理日志连接
* TODO 用来写入日志
* */
public interface LoggerManager {
SseEmitter subscribe(String taskId,HttpServletRequest request);
void append(String taskId,String logContent);
String getLogContent(String taskId);
}

View File

@ -0,0 +1,118 @@
package cd.casic.ci.process.engine.manager.impl;
import cd.casic.ci.process.engine.manager.LoggerManager;
import cd.casic.ci.process.process.dal.pipeline.PipTaskLogDao;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.framework.commons.util.network.IpUtil;
import cd.casic.framework.commons.util.util.WebFrameworkUtils;
import jakarta.annotation.Resource;
import jakarta.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 流水线运行时日志管理
* */
@Service
@Slf4j
public class MemoryLogManager implements LoggerManager {
/**
* 第一级 是taskId可能同一个账号IP建立多个连接所以保存为list
* */
private final static Map<String, List<SseEmitter>> taskIdSSEMap = new ConcurrentHashMap<>();
private final static Map<String,StringBuffer> taskIdMemoryLogMap = new ConcurrentHashMap<>();
public static final Integer FLUSH_DB_SIZE=2*1024*1024;
/**
* 缓存最近一次执行的日志key是taskIdval是数据库id已入库的情况下用于buffer满了增加日志内容
* 读取日志的时候同时读取数据库和内存中的日志
* */
private final static Map<String,String> taskIdDbMap = new ConcurrentHashMap<>();
@Resource
private PipTaskLogDao logDao;
@Override
public SseEmitter subscribe(String taskId,HttpServletRequest request) {
String ipAddr = IpUtil.getIpAddr(request);
SseEmitter emitter = new SseEmitter(3000L);
emitterInit(emitter,taskId);
Long loginUserId = WebFrameworkUtils.getLoginUserId();
log.info("SSE连接建立");
log.info("当前请求ip{}",ipAddr);
log.info("当前用户id{}",loginUserId);
log.info("当前taskId{}",loginUserId);
List<SseEmitter> taskIdSSEList = taskIdSSEMap.getOrDefault(taskId, new ArrayList<>(1));
taskIdSSEList.add(emitter);
return emitter;
}
@Override
public void append(String taskId, String logContent) {
/**
* 先往内存里写
* 内存满4mb入库查询日志和入库操作用同一把锁
* 然后新内容推sse
* */
StringBuffer logCache = taskIdMemoryLogMap.getOrDefault(taskId, new StringBuffer());
int length = logCache.length();
if (length>=FLUSH_DB_SIZE) {
synchronized (this){
if (length>=FLUSH_DB_SIZE) {
StringBuffer stringBuffer = new StringBuffer();
// 清空缓存
taskIdMemoryLogMap.put(taskId, stringBuffer);
// 入库或者更新
if (taskIdDbMap.containsKey(taskId)) {
// 之前已经入库过
// 存在则更新
String id = taskIdDbMap.get(taskId);
PipTaskLog pipTaskLog = logDao.selectById(id);
pipTaskLog.setContent(pipTaskLog.getContent()+logCache.toString());
logDao.updateById(pipTaskLog);
} else {
// 不存在就新增
PipTaskLog pipTaskLog = new PipTaskLog();
pipTaskLog.setTaskId(taskId);
pipTaskLog.setContent(logCache.toString());
logDao.insert(pipTaskLog);
taskIdDbMap.put(taskId,pipTaskLog.getId());
}
logCache = stringBuffer;
}
}
}
logCache.append(logContent);
}
@Override
public String getLogContent(String taskId) {
StringBuffer logCache = taskIdMemoryLogMap.getOrDefault(taskId, new StringBuffer());
String id = taskIdDbMap.get(taskId);
if (id != null) {
PipTaskLog pipTaskLog = logDao.selectById(id);
return pipTaskLog.getContent()+logCache.toString();
}
return logCache.toString();
}
private void emitterInit(SseEmitter emitter,String taskId){
emitter.onCompletion(()->{
taskIdSSEMap.remove(taskId);
log.info("===================taskId:{}断开连接===============",taskId);
});
emitter.onError((e)->{
taskIdSSEMap.remove(taskId);
log.error("===================错误taskId:{}断开连接===============",taskId,e);
});
emitter.onTimeout(()->{
taskIdSSEMap.remove(taskId);
log.error("===================超时taskId:{}断开连接===============",taskId);
});
}
}

View File

@ -0,0 +1,7 @@
package cd.casic.ci.process.process.dal.pipeline;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.framework.mybatis.core.mapper.BaseMapperX;
public interface PipTaskLogDao extends BaseMapperX<PipTaskLog> {
}

View File

@ -23,5 +23,5 @@ public interface TaskService extends IService<PipTask> {
void copyTask(String taskId); void copyTask(String taskId);
Boolean updateTask(@RequestBody TaskUpdateReq req); Boolean updateTask(@RequestBody TaskUpdateReq req);
CommonResult<PipTaskLog> getLogContentByTaskId(@PathVariable String taskId); CommonResult<String> getLogContentByTaskId(@PathVariable String taskId);
} }

View File

@ -3,6 +3,7 @@ package cd.casic.ci.process.process.service.task.impl;
import cd.casic.ci.common.pipeline.req.task.TaskUpdateReq; import cd.casic.ci.common.pipeline.req.task.TaskUpdateReq;
import cd.casic.ci.common.pipeline.resp.task.TasksResp; import cd.casic.ci.common.pipeline.resp.task.TasksResp;
import cd.casic.ci.process.engine.constant.EngineRuntimeConstant; import cd.casic.ci.process.engine.constant.EngineRuntimeConstant;
import cd.casic.ci.process.engine.manager.LoggerManager;
import cd.casic.ci.process.engine.manager.RunContextManager; import cd.casic.ci.process.engine.manager.RunContextManager;
import cd.casic.ci.process.engine.runContext.BaseRunContext; import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.process.dal.pipeline.PipTaskDao; import cd.casic.ci.process.process.dal.pipeline.PipTaskDao;
@ -34,6 +35,8 @@ public class TaskServiceImpl extends ServiceImpl<PipTaskDao, PipTask> implements
private PipTaskDao taskDao; private PipTaskDao taskDao;
@Resource @Resource
private RunContextManager contextManager; private RunContextManager contextManager;
@Resource
private LoggerManager loggerManager;
@Override @Override
@ -120,14 +123,8 @@ public class TaskServiceImpl extends ServiceImpl<PipTaskDao, PipTask> implements
} }
@Override @Override
public CommonResult<PipTaskLog> getLogContentByTaskId(String taskId) { public CommonResult<String> getLogContentByTaskId(String taskId) {
BaseRunContext context = contextManager.getContext(taskId);
if (context==null) { return CommonResult.success(loggerManager.getLogContent(taskId));
return CommonResult.success(new PipTaskLog());
}
if (context.getLocalVariables().get(EngineRuntimeConstant.LOG_KEY) instanceof PipTaskLog log) {
return CommonResult.success(log);
}
return CommonResult.success(new PipTaskLog());
} }
} }

View File

@ -0,0 +1,25 @@
package cd.casic.server.controller;
import cd.casic.ci.process.engine.manager.LoggerManager;
import cd.casic.framework.commons.util.network.IpUtil;
import cd.casic.framework.commons.util.util.WebFrameworkUtils;
import jakarta.annotation.Resource;
import jakarta.servlet.http.HttpServletRequest;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
@RestController
@RequestMapping("/sse")
public class SSEController {
@Resource
private LoggerManager loggerManager;
@GetMapping("/subscribe/log/{taskId}")
public SseEmitter log(HttpServletRequest request,@PathVariable String taskId){
return loggerManager.subscribe(taskId, request);
}
}

View File

@ -47,7 +47,7 @@ public class TasksController {
return CommonResult.success(b); return CommonResult.success(b);
} }
@PostMapping("/getLogContentByTaskId/{taskId}") @PostMapping("/getLogContentByTaskId/{taskId}")
public CommonResult<PipTaskLog> getLogContentByTaskId(@PathVariable String taskId){ public CommonResult<String> getLogContentByTaskId(@PathVariable String taskId){
return taskService.getLogContentByTaskId(taskId); return taskService.getLogContentByTaskId(taskId);
} }