diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/config/ExecutorConfig.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/config/ExecutorConfig.java index 7ac1c45a..22b376a2 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/config/ExecutorConfig.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/config/ExecutorConfig.java @@ -11,7 +11,7 @@ import java.util.concurrent.ThreadPoolExecutor; @Configuration public class ExecutorConfig { @Bean("pipelineExecutor") - public ThreadPoolTaskExecutor taskExecutor() { + public ThreadPoolTaskExecutor pipelineExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); @@ -22,4 +22,16 @@ public class ExecutorConfig { executor.initialize(); // 必须手动触发初始化 return executor; } + @Bean("taskExecutor") + public ThreadPoolTaskExecutor taskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(5); + executor.setMaxPoolSize(10); + executor.setQueueCapacity(100); + executor.setThreadNamePrefix("Task-"); + ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy(); + executor.setRejectedExecutionHandler(callerRunsPolicy); + executor.initialize(); // 必须手动触发初始化 + return executor; + } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java index c317d37e..99d43b8d 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java @@ -7,7 +7,9 @@ import cd.casic.ci.process.engine.runContext.PipelineRunContext; import cd.casic.ci.process.engine.runContext.SecondStageRunContext; import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.stage.PipStage; +import cd.casic.framework.mq.redis.core.RedisMQTemplate; import org.springframework.core.task.TaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.CollectionUtils; import java.util.Collection; @@ -24,13 +26,17 @@ public class ParallelDispatcher implements BaseDispatcher{ private Integer stageIndex; private PipelineRunContext pipelineRunContext; private RunContextManager runContextManager; + private RedisMQTemplate redisMQTemplate; + private ThreadPoolTaskExecutor taskExecutor; - public ParallelDispatcher(List firstStageList, PipelineRunContext context,RunContextManager contextManager) { + public ParallelDispatcher(List firstStageList, PipelineRunContext context,RunContextManager contextManager,RedisMQTemplate redisMQTemplate,ThreadPoolTaskExecutor taskExecutor) { this.firstStageList = firstStageList; this.pipelineRunContext = context; this.stageIndex = 0; this.runContextManager = contextManager; contextManager.contextRegister(context); + this.redisMQTemplate = redisMQTemplate; + this.taskExecutor = taskExecutor; } @Override @@ -45,11 +51,12 @@ public class ParallelDispatcher implements BaseDispatcher{ CountDownLatch latch = new CountDownLatch(stageList.size()); for (PipStage secondStage : stageList) { // 二阶段下所有task是串行所以不用关心线程安全相关信息 - SecondStageRunContext context = new SecondStageRunContext(secondStage,pipelineRunContext,new HashMap<>(),new HashMap<>()); + SecondStageRunContext context = new SecondStageRunContext(secondStage,pipelineRunContext,new ConcurrentHashMap<>()); runContextManager.contextRegister(context); - SerialDispatcher serialDispatcher = new SerialDispatcher(context,latch); + SerialDispatcher serialDispatcher = new SerialDispatcher(context,latch,runContextManager,redisMQTemplate); // 给线程池进行执行 + taskExecutor.execute(serialDispatcher); } latch.await(); } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java index e4d019b0..e1999a84 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java @@ -1,22 +1,30 @@ package cd.casic.ci.process.engine.dispatcher.impl; import cd.casic.ci.process.engine.dispatcher.BaseDispatcher; +import cd.casic.ci.process.engine.enums.ContextStateEnum; +import cd.casic.ci.process.engine.manager.RunContextManager; +import cd.casic.ci.process.engine.message.TaskRunMessage; import cd.casic.ci.process.engine.runContext.SecondStageRunContext; +import cd.casic.ci.process.engine.runContext.TaskRunContext; import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; import cd.casic.ci.process.process.dataObject.stage.PipStage; import cd.casic.ci.process.process.dataObject.task.PipTask; +import cd.casic.framework.mq.redis.core.RedisMQTemplate; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; public class SerialDispatcher implements BaseDispatcher { private SecondStageRunContext stageRunContext; private List taskList; private CountDownLatch latch; private String triggerModel; + private RedisMQTemplate redisMQTemplate; + private RunContextManager contextManager; - public SerialDispatcher(SecondStageRunContext stageRunContext, CountDownLatch latch) { + public SerialDispatcher(SecondStageRunContext stageRunContext, CountDownLatch latch,RunContextManager contextManager,RedisMQTemplate redisMQTemplate) { this.stageRunContext = stageRunContext; PipBaseElement contextDef = stageRunContext.getContextDef(); if (contextDef instanceof PipStage secondStage) { @@ -24,12 +32,19 @@ public class SerialDispatcher implements BaseDispatcher { this.taskList = secondStage.getTaskValues(); } this.latch = latch; + this.redisMQTemplate = redisMQTemplate; + this.contextManager = contextManager; } @Override public void dispatch() { for (PipTask pipTask : taskList) { - // TODO 注册taskContext,且发送消息至消息队列给work执行 + // 注册taskContext,且发送消息至消息队列给work执行 + TaskRunContext taskRunContext = new TaskRunContext(pipTask,stageRunContext); + contextManager.contextRegister(taskRunContext); + taskRunContext.setState(new AtomicInteger(ContextStateEnum.READY.getCode())); + TaskRunMessage taskRunMessage = new TaskRunMessage(pipTask); + redisMQTemplate.send(taskRunMessage); } } @@ -40,10 +55,10 @@ public class SerialDispatcher implements BaseDispatcher { // TODO 看看需要内存入库还是忽略掉当前执行,进行入库(countDown放行) // stageRunContext.callParentChange(); dispatch(); - latch.countDown(); } catch (Exception e) { // throw new RuntimeException(e); // TODO 当前stage标记为失败等待父context发现并处理 } + latch.countDown(); } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java index b3a934af..2d41fdd9 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java @@ -12,6 +12,7 @@ import cd.casic.ci.process.process.service.pipeline.PipelineService; import cd.casic.ci.process.process.service.stage.StageService; import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; +import cd.casic.framework.mq.redis.core.RedisMQTemplate; import jakarta.annotation.Resource; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @@ -34,7 +35,12 @@ public class DefaultPipelineExecutor implements PipelineExecutor { private RunContextManager runContextManager; @Resource @Qualifier("pipelineExecutor") + private ThreadPoolTaskExecutor pipelineExecutor; + @Resource + @Qualifier("taskExecutor") private ThreadPoolTaskExecutor taskExecutor; + @Resource + private RedisMQTemplate redisMQTemplate; @Override public PipelineRunContext execute(String pipelineId) { PipPipeline pipeline = pipelineService.getById(pipelineId); @@ -49,7 +55,7 @@ public class DefaultPipelineExecutor implements PipelineExecutor { throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到有效阶段信息"); } PipelineRunContext pipelineRunContext = new PipelineRunContext(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>()); - ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager); + ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager,redisMQTemplate,taskExecutor); taskExecutor.execute(parallelDispatcher); return pipelineRunContext; } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/WorkerManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/WorkerManager.java index 8b3e3b77..47d976c3 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/WorkerManager.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/WorkerManager.java @@ -1,4 +1,6 @@ package cd.casic.ci.process.engine.manager; - +/** + * 负责监听队列,找到ContextManager获取runContext,然后实际执行 + * */ public interface WorkerManager { } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java index 9d32452d..92621ca3 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java @@ -2,10 +2,20 @@ package cd.casic.ci.process.engine.manager.impl; import cd.casic.ci.process.engine.manager.RunContextManager; import cd.casic.ci.process.engine.runContext.BaseRunContext; +import cd.casic.ci.process.engine.runContext.PipelineRunContext; +import cd.casic.ci.process.engine.runContext.SecondStageRunContext; +import cd.casic.ci.process.engine.runContext.TaskRunContext; +import cd.casic.ci.process.process.dataObject.base.PipBaseElement; +import cd.casic.framework.commons.exception.ServiceException; +import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; import org.springframework.stereotype.Component; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + @Component public class DefaultRunContextManager implements RunContextManager { + private final Map contextMap= new ConcurrentHashMap(); @Override public Boolean stopPipeline(String pipelineId) { return null; @@ -28,16 +38,50 @@ public class DefaultRunContextManager implements RunContextManager { @Override public Boolean suspendStage(String pipelineId, String stageId) { + PipelineRunContext pipelineRunContext = contextMap.get(pipelineId); + if (pipelineRunContext !=null) { + + } return null; } @Override public void contextRegister(BaseRunContext context) { - + PipBaseElement contextDef = context.getContextDef(); + String id = contextDef.getId(); + BaseRunContext parentContext = context.getParentContext(); + if (context instanceof PipelineRunContext) { + contextMap.put(id,(PipelineRunContext)context); + } else { + if (parentContext==null) { + throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"注册context失败"); + } + PipBaseElement parentContextDef = parentContext.getContextDef(); + BaseRunContext parentRunContext = null; + for (Map.Entry entry : contextMap.entrySet()) { + PipelineRunContext value = entry.getValue(); + BaseRunContext runContext = value.getRunContext(parentContextDef.getId()); + if (runContext!=null) { + parentRunContext = runContext; + break; + } + } + if (parentRunContext==null) { + throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到父级context"); + } + parentRunContext.putChildRunContext(id,context); + } } @Override public BaseRunContext getContext(String key) { + for (Map.Entry entry : contextMap.entrySet()) { + PipelineRunContext value = entry.getValue(); + BaseRunContext runContext = value.getRunContext(key); + if (runContext!=null) { + return runContext; + } + } return null; } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultWorkerManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultWorkerManager.java new file mode 100644 index 00000000..08da74bb --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultWorkerManager.java @@ -0,0 +1,15 @@ +package cd.casic.ci.process.engine.manager.impl; + +import cd.casic.ci.process.engine.message.TaskRunMessage; +import cd.casic.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class DefaultWorkerManager extends AbstractRedisStreamMessageListener { + @Override + public void onMessage(TaskRunMessage message) { + log.info("===============接收到消息================"); + } +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/message/TaskRunMessage.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/message/TaskRunMessage.java new file mode 100644 index 00000000..7365e6e7 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/message/TaskRunMessage.java @@ -0,0 +1,20 @@ +package cd.casic.ci.process.engine.message; + +import cd.casic.ci.process.process.dataObject.base.PipBaseElement; +import cd.casic.ci.process.process.dataObject.task.PipTask; +import cd.casic.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage; +import cd.casic.framework.mq.redis.core.stream.AbstractRedisStreamMessage; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; + +@EqualsAndHashCode(callSuper = true) +@Data +@NoArgsConstructor +public class TaskRunMessage extends AbstractRedisStreamMessage { + private PipTask task; + + public TaskRunMessage(PipTask task) { + this.task = task; + } +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java index 94cacbe5..78098055 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java @@ -58,7 +58,7 @@ public abstract class BaseRunContext { /** * 获取当前或者子上下文 * */ - public abstract BaseRunContext getChildRunContext(String key); + public abstract BaseRunContext getRunContext(String key); public abstract void putChildRunContext(String key,BaseRunContext context); public void callParentChange(ContextStateEnum state){ if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) { diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java index 3aedf6fa..6940845b 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java @@ -35,10 +35,10 @@ public class PipelineRunContext extends BaseRunContext{ * pipeline 底下有多个阶段,多个阶段包含多个task 不保存第二级context * */ @Override - public BaseRunContext getChildRunContext(String stageId) { + public BaseRunContext getRunContext(String stageId) { Map childContext = getChildContext(); for (Map.Entry entry : childContext.entrySet()) { - BaseRunContext childRunContext = entry.getValue().getChildRunContext(stageId); + BaseRunContext childRunContext = entry.getValue().getRunContext(stageId); if (childRunContext!=null) { return childRunContext; } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/SecondStageRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/SecondStageRunContext.java index d02a623a..fc02548e 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/SecondStageRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/SecondStageRunContext.java @@ -10,15 +10,18 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class SecondStageRunContext extends BaseRunContext{ - public SecondStageRunContext(PipStage contextDef, PipelineRunContext parentContext, Map globalVariables, Map localVariables) { - super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), globalVariables, localVariables, new ConcurrentHashMap<>(contextDef.getStageList().size())); + public SecondStageRunContext(PipStage contextDef, PipelineRunContext parentContext, Map localVariables) { + super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), parentContext.getGlobalVariables(), localVariables, new ConcurrentHashMap<>(contextDef.getStageList().size())); } @Override - public BaseRunContext getChildRunContext(String taskId) { + public BaseRunContext getRunContext(String taskId) { + if (this.getContextDef().getId().equals(taskId)) { + return this; + } Map childContext = getChildContext(); for (Map.Entry entry : childContext.entrySet()) { - BaseRunContext childRunContext = entry.getValue().getChildRunContext(taskId); + BaseRunContext childRunContext = entry.getValue().getRunContext(taskId); if (childRunContext!=null) { return childRunContext; } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/TaskRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/TaskRunContext.java index a0118e96..423c602f 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/TaskRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/TaskRunContext.java @@ -11,8 +11,8 @@ import java.util.HashMap; import java.util.Map; public class TaskRunContext extends BaseRunContext{ - public TaskRunContext(PipTask contextDef, SecondStageRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map globalVariables, Map localVariables) { - super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, new HashMap<>()); + public TaskRunContext(PipTask contextDef, SecondStageRunContext parentContext) { + super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), parentContext.getGlobalVariables(), new HashMap<>(), new HashMap<>()); } private TaskRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map globalVariables, Map localVariables, Map childContext) { super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, childContext); @@ -22,7 +22,7 @@ public class TaskRunContext extends BaseRunContext{ * task是最后一层没有下一级所以如果id相同直接返回它自己 * */ @Override - public BaseRunContext getChildRunContext(String id) { + public BaseRunContext getRunContext(String id) { if (!StringUtils.isEmpty(id)||!id.equals(this.getContextDef().getId())) { return null; }