test2通过(如果正确注册context就能找到worker执行,还差分发逻辑暂时没测试)

This commit is contained in:
even 2025-05-19 14:32:43 +08:00
parent e3fc790948
commit 4280521a3c
3 changed files with 7 additions and 5 deletions

View File

@ -52,12 +52,12 @@ public class ParallelDispatcher implements BaseDispatcher{
for (PipStage secondStage : stageList) {
// 二阶段下所有task是串行所以不用关心线程安全相关信息
SecondStageRunContext context = new SecondStageRunContext(secondStage,pipelineRunContext,new ConcurrentHashMap<>());
runContextManager.contextRegister(context);
SerialDispatcher serialDispatcher = new SerialDispatcher(context,latch,runContextManager,redisMQTemplate);
// 给线程池进行执行
taskExecutor.execute(serialDispatcher);
}
// 等待当前阶段执行
latch.await();
}

View File

@ -12,6 +12,7 @@ import cd.casic.ci.process.process.dataObject.stage.PipStage;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@ -39,12 +40,13 @@ public class SerialDispatcher implements BaseDispatcher {
@Override
public void dispatch() {
for (PipTask pipTask : taskList) {
// 注册taskContext且发送消息至消息队列给work执行
TaskRunContext taskRunContext = new TaskRunContext(pipTask,stageRunContext);
// 注册taskContext且发送消息至消息队列给work执行, 如果需要则传入参数
TaskRunContext taskRunContext = new TaskRunContext(pipTask,stageRunContext,new HashMap<>());
contextManager.contextRegister(taskRunContext);
taskRunContext.changeContextState(ContextStateEnum.READY);
TaskRunMessage taskRunMessage = new TaskRunMessage(pipTask);
redisMQTemplate.send(taskRunMessage);
// TODO 监听当前taskContext状态
}
}

View File

@ -11,8 +11,8 @@ import java.util.HashMap;
import java.util.Map;
public class TaskRunContext extends BaseRunContext{
public TaskRunContext(PipTask contextDef, SecondStageRunContext parentContext) {
super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), parentContext.getGlobalVariables(), new HashMap<>(), new HashMap<>());
public TaskRunContext(PipTask contextDef, SecondStageRunContext parentContext,Map<String,Object> localVariable) {
super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), parentContext.getGlobalVariables(),localVariable, new HashMap<>());
}
private TaskRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) {
super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, childContext);