diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java index 0ba973fd..e33ca32b 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java @@ -17,6 +17,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + /** * 流水线运行时日志管理 * */ @@ -31,6 +35,7 @@ public class MemoryLogManager implements LoggerManager { private final Map taskIdMemoryLogMap = new ConcurrentHashMap<>(); public final Integer FLUSH_DB_SIZE=2*1024*1024; + private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); /** * 缓存最近一次执行的日志,key是taskId,val是数据库id(已入库的情况下)用于buffer满了增加日志内容 * 读取日志的时候同时读取数据库和内存中的日志 @@ -41,7 +46,7 @@ public class MemoryLogManager implements LoggerManager { @Override public SseEmitter subscribe(String taskId,HttpServletRequest request) { String ipAddr = IpUtil.getIpAddr(request); - SseEmitter emitter = new SseEmitter(3000L); + SseEmitter emitter = new SseEmitter(30000L); emitterInit(emitter,taskId); Long loginUserId = WebFrameworkUtils.getLoginUserId(); log.info("SSE连接建立"); @@ -114,6 +119,13 @@ public class MemoryLogManager implements LoggerManager { } private void emitterInit(SseEmitter emitter,String taskId){ + scheduler.scheduleWithFixedDelay(()-> { + try { + emitter.send("\n\n"); + } catch (IOException e) { + log.error("",e); + } + },0,30,TimeUnit.SECONDS); emitter.onCompletion(()->{ taskIdSSEMap.remove(taskId); log.info("===================taskId:{}断开连接===============",taskId);