SSE心跳+worker还有taskType定义

This commit is contained in:
even 2025-05-27 16:15:22 +08:00
parent 0734d01fb2
commit 43a950e816
3 changed files with 32 additions and 8 deletions

View File

@ -16,10 +16,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/** /**
* 流水线运行时日志管理 * 流水线运行时日志管理
@ -120,23 +117,26 @@ public class MemoryLogManager implements LoggerManager {
private void emitterInit(SseEmitter emitter,String taskId){ private void emitterInit(SseEmitter emitter,String taskId){
// 维持心跳 // 维持心跳
scheduler.scheduleWithFixedDelay(()-> { ScheduledFuture<?> scheduledFuture = scheduler.scheduleWithFixedDelay(() -> {
try { try {
emitter.send("\n\n"); emitter.send(SseEmitter.event().comment("heartbeat"));
} catch (IOException e) { } catch (IOException e) {
log.error("", e); log.error("", e);
} }
}, 0, 30, TimeUnit.SECONDS); }, 0, 30, TimeUnit.SECONDS);
emitter.onCompletion(()->{ emitter.onCompletion(()->{
taskIdSSEMap.remove(taskId); taskIdSSEMap.remove(taskId);
scheduledFuture.cancel(true);
log.info("===================taskId:{}断开连接===============",taskId); log.info("===================taskId:{}断开连接===============",taskId);
}); });
emitter.onError((e)->{ emitter.onError((e)->{
taskIdSSEMap.remove(taskId); taskIdSSEMap.remove(taskId);
scheduledFuture.cancel(true);
log.error("===================错误taskId:{}断开连接===============",taskId,e); log.error("===================错误taskId:{}断开连接===============",taskId,e);
}); });
emitter.onTimeout(()->{ emitter.onTimeout(()->{
taskIdSSEMap.remove(taskId); taskIdSSEMap.remove(taskId);
scheduledFuture.cancel(true);
log.error("===================超时taskId:{}断开连接===============",taskId); log.error("===================超时taskId:{}断开连接===============",taskId);
}); });
} }

View File

@ -0,0 +1,12 @@
package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
@Plugin(taskType = "AFL")
public class AFLWorker extends SshWorker{
@Override
public void execute(TaskRunContext context) {
}
}

View File

@ -0,0 +1,12 @@
package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
@Plugin(taskType = "TEST_CASE_GENERATION")
public class TestCaseGenerationWorker extends SshWorker{
@Override
public void execute(TaskRunContext context) {
}
}