diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java index 724e63d0..264e0c57 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java @@ -16,10 +16,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * 流水线运行时日志管理 @@ -120,23 +117,26 @@ public class MemoryLogManager implements LoggerManager { private void emitterInit(SseEmitter emitter,String taskId){ // 维持心跳 - scheduler.scheduleWithFixedDelay(()-> { + ScheduledFuture scheduledFuture = scheduler.scheduleWithFixedDelay(() -> { try { - emitter.send("\n\n"); + emitter.send(SseEmitter.event().comment("heartbeat")); } catch (IOException e) { - log.error("",e); + log.error("", e); } - },0,30,TimeUnit.SECONDS); + }, 0, 30, TimeUnit.SECONDS); emitter.onCompletion(()->{ taskIdSSEMap.remove(taskId); + scheduledFuture.cancel(true); log.info("===================taskId:{}断开连接===============",taskId); }); emitter.onError((e)->{ taskIdSSEMap.remove(taskId); + scheduledFuture.cancel(true); log.error("===================错误,taskId:{}断开连接===============",taskId,e); }); emitter.onTimeout(()->{ taskIdSSEMap.remove(taskId); + scheduledFuture.cancel(true); log.error("===================超时,taskId:{}断开连接===============",taskId); }); } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/AFLWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/AFLWorker.java new file mode 100644 index 00000000..5598b1c1 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/AFLWorker.java @@ -0,0 +1,12 @@ +package cd.casic.ci.process.engine.worker; + +import cd.casic.ci.common.pipeline.annotation.Plugin; +import cd.casic.ci.process.engine.runContext.TaskRunContext; + +@Plugin(taskType = "AFL") +public class AFLWorker extends SshWorker{ + @Override + public void execute(TaskRunContext context) { + + } +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TestCaseGenerationWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TestCaseGenerationWorker.java new file mode 100644 index 00000000..72b65770 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TestCaseGenerationWorker.java @@ -0,0 +1,12 @@ +package cd.casic.ci.process.engine.worker; + +import cd.casic.ci.common.pipeline.annotation.Plugin; +import cd.casic.ci.process.engine.runContext.TaskRunContext; + +@Plugin(taskType = "TEST_CASE_GENERATION") +public class TestCaseGenerationWorker extends SshWorker{ + @Override + public void execute(TaskRunContext context) { + + } +}