From 8ab2eb2d4ce8ee9c046401c0ddc854c38aeb10c0 Mon Sep 17 00:00:00 2001 From: even <827656971@qq.com> Date: Mon, 26 May 2025 19:33:02 +0800 Subject: [PATCH] =?UTF-8?q?=E6=97=A5=E5=BF=97=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../process/engine/manager/LoggerManager.java | 14 +++ .../engine/manager/impl/MemoryLogManager.java | 118 ++++++++++++++++++ .../process/dal/pipeline/PipTaskLogDao.java | 7 ++ .../server/controller/SSEController.java | 25 ++++ 4 files changed, 164 insertions(+) create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/LoggerManager.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dal/pipeline/PipTaskLogDao.java create mode 100644 ops-server/src/main/java/cd/casic/server/controller/SSEController.java diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/LoggerManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/LoggerManager.java new file mode 100644 index 00000000..993ade41 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/LoggerManager.java @@ -0,0 +1,14 @@ +package cd.casic.ci.process.engine.manager; + +import jakarta.servlet.http.HttpServletRequest; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +/** + * TODO 用来管理日志连接 + * TODO 用来写入日志 + * */ +public interface LoggerManager { + SseEmitter subscribe(String taskId,HttpServletRequest request); + void append(String taskId,String logContent); + String getLogContent(String taskId); +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java new file mode 100644 index 00000000..48b671fa --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java @@ -0,0 +1,118 @@ +package cd.casic.ci.process.engine.manager.impl; + +import cd.casic.ci.process.engine.manager.LoggerManager; +import cd.casic.ci.process.process.dal.pipeline.PipTaskLogDao; +import cd.casic.ci.process.process.dataObject.log.PipTaskLog; +import cd.casic.framework.commons.util.network.IpUtil; +import cd.casic.framework.commons.util.util.WebFrameworkUtils; +import jakarta.annotation.Resource; +import jakarta.servlet.http.HttpServletRequest; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +/** + * 流水线运行时日志管理 + * */ +@Service +@Slf4j +public class MemoryLogManager implements LoggerManager { + /** + * 第一级 是taskId,可能同一个账号、IP、建立多个连接所以保存为list + * */ + private final static Map> taskIdSSEMap = new ConcurrentHashMap<>(); + + private final static Map taskIdMemoryLogMap = new ConcurrentHashMap<>(); + + public static final Integer FLUSH_DB_SIZE=2*1024*1024; + /** + * 缓存最近一次执行的日志,key是taskId,val是数据库id(已入库的情况下)用于buffer满了增加日志内容 + * 读取日志的时候同时读取数据库和内存中的日志 + * */ + private final static Map taskIdDbMap = new ConcurrentHashMap<>(); + @Resource + private PipTaskLogDao logDao; + @Override + public SseEmitter subscribe(String taskId,HttpServletRequest request) { + String ipAddr = IpUtil.getIpAddr(request); + SseEmitter emitter = new SseEmitter(3000L); + emitterInit(emitter,taskId); + Long loginUserId = WebFrameworkUtils.getLoginUserId(); + log.info("SSE连接建立"); + log.info("当前请求ip:{}",ipAddr); + log.info("当前用户id:{}",loginUserId); + log.info("当前taskId:{}",loginUserId); + List taskIdSSEList = taskIdSSEMap.getOrDefault(taskId, new ArrayList<>(1)); + taskIdSSEList.add(emitter); + return emitter; + } + + @Override + public void append(String taskId, String logContent) { + /** + * 先往内存里写 + * 内存满4mb入库,查询日志和入库操作用同一把锁 + * 然后新内容推sse + * */ + StringBuffer logCache = taskIdMemoryLogMap.getOrDefault(taskId, new StringBuffer()); + int length = logCache.length(); + if (length>=FLUSH_DB_SIZE) { + synchronized (this){ + if (length>=FLUSH_DB_SIZE) { + StringBuffer stringBuffer = new StringBuffer(); + // 清空缓存 + taskIdMemoryLogMap.put(taskId, stringBuffer); + // 入库或者更新 + if (taskIdDbMap.containsKey(taskId)) { + // 之前已经入库过 + // 存在则更新 + String id = taskIdDbMap.get(taskId); + PipTaskLog pipTaskLog = logDao.selectById(id); + pipTaskLog.setContent(pipTaskLog.getContent()+logCache.toString()); + logDao.updateById(pipTaskLog); + } else { + // 不存在就新增 + PipTaskLog pipTaskLog = new PipTaskLog(); + pipTaskLog.setTaskId(taskId); + pipTaskLog.setContent(logCache.toString()); + logDao.insert(pipTaskLog); + taskIdDbMap.put(taskId,pipTaskLog.getId()); + } + logCache = stringBuffer; + } + } + } + logCache.append(logContent); + } + + @Override + public String getLogContent(String taskId) { + StringBuffer logCache = taskIdMemoryLogMap.getOrDefault(taskId, new StringBuffer()); + String id = taskIdDbMap.get(taskId); + if (id != null) { + PipTaskLog pipTaskLog = logDao.selectById(id); + return pipTaskLog.getContent()+logCache.toString(); + } + return logCache.toString(); + } + + private void emitterInit(SseEmitter emitter,String taskId){ + emitter.onCompletion(()->{ + taskIdSSEMap.remove(taskId); + log.info("===================taskId:{}断开连接===============",taskId); + }); + emitter.onError((e)->{ + taskIdSSEMap.remove(taskId); + log.error("===================错误,taskId:{}断开连接===============",taskId,e); + }); + emitter.onTimeout(()->{ + taskIdSSEMap.remove(taskId); + log.error("===================超时,taskId:{}断开连接===============",taskId); + }); + } +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dal/pipeline/PipTaskLogDao.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dal/pipeline/PipTaskLogDao.java new file mode 100644 index 00000000..47fcd1ff --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dal/pipeline/PipTaskLogDao.java @@ -0,0 +1,7 @@ +package cd.casic.ci.process.process.dal.pipeline; + +import cd.casic.ci.process.process.dataObject.log.PipTaskLog; +import cd.casic.framework.mybatis.core.mapper.BaseMapperX; + +public interface PipTaskLogDao extends BaseMapperX { +} diff --git a/ops-server/src/main/java/cd/casic/server/controller/SSEController.java b/ops-server/src/main/java/cd/casic/server/controller/SSEController.java new file mode 100644 index 00000000..348f6b1c --- /dev/null +++ b/ops-server/src/main/java/cd/casic/server/controller/SSEController.java @@ -0,0 +1,25 @@ +package cd.casic.server.controller; + +import cd.casic.ci.process.engine.manager.LoggerManager; +import cd.casic.framework.commons.util.network.IpUtil; +import cd.casic.framework.commons.util.util.WebFrameworkUtils; +import jakarta.annotation.Resource; +import jakarta.servlet.http.HttpServletRequest; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.io.IOException; + +@RestController +@RequestMapping("/sse") +public class SSEController { + @Resource + private LoggerManager loggerManager; + @GetMapping("/subscribe/log/{taskId}") + public SseEmitter log(HttpServletRequest request,@PathVariable String taskId){ + return loggerManager.subscribe(taskId, request); + } +}