diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/config/ExecutorConfig.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/config/ExecutorConfig.java new file mode 100644 index 0000000..7ac1c45 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/config/ExecutorConfig.java @@ -0,0 +1,25 @@ +package cd.casic.ci.process.engine.config; + +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; + +@Configuration +public class ExecutorConfig { + @Bean("pipelineExecutor") + public ThreadPoolTaskExecutor taskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(5); + executor.setMaxPoolSize(10); + executor.setQueueCapacity(100); + executor.setThreadNamePrefix("Pipeline-"); + ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy(); + executor.setRejectedExecutionHandler(callerRunsPolicy); + executor.initialize(); // 必须手动触发初始化 + return executor; + } +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/BaseDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/BaseDispatcher.java index b032b82..0ea126d 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/BaseDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/BaseDispatcher.java @@ -5,6 +5,6 @@ import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import java.util.List; -public interface BaseDispatcher { - void dispatch(); +public interface BaseDispatcher extends Runnable{ + void dispatch() throws InterruptedException; } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java index 316f14e..c317d37 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java @@ -1,26 +1,67 @@ package cd.casic.ci.process.engine.dispatcher.impl; import cd.casic.ci.process.engine.dispatcher.BaseDispatcher; +import cd.casic.ci.process.engine.enums.ContextStateEnum; +import cd.casic.ci.process.engine.manager.RunContextManager; import cd.casic.ci.process.engine.runContext.PipelineRunContext; +import cd.casic.ci.process.engine.runContext.SecondStageRunContext; import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.stage.PipStage; +import org.springframework.core.task.TaskExecutor; +import org.springframework.util.CollectionUtils; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; -public class ParallelDispatcher implements BaseDispatcher { +public class ParallelDispatcher implements BaseDispatcher{ private List firstStageList; private Integer stageIndex; - private PipelineRunContext context; + private PipelineRunContext pipelineRunContext; + private RunContextManager runContextManager; - public ParallelDispatcher(List firstStageList, PipelineRunContext context) { + public ParallelDispatcher(List firstStageList, PipelineRunContext context,RunContextManager contextManager) { this.firstStageList = firstStageList; - this.context = context; + this.pipelineRunContext = context; this.stageIndex = 0; + this.runContextManager = contextManager; + contextManager.contextRegister(context); } @Override - public void dispatch() { + public void dispatch() throws InterruptedException { + // 负责依次执行阶段 + for (PipStage stage : firstStageList) { + List stageList = stage.getStageList(); + if (CollectionUtils.isEmpty(stageList)) { + // 此处可以暂时记录日志 + continue; + } + CountDownLatch latch = new CountDownLatch(stageList.size()); + for (PipStage secondStage : stageList) { + // 二阶段下所有task是串行所以不用关心线程安全相关信息 + SecondStageRunContext context = new SecondStageRunContext(secondStage,pipelineRunContext,new HashMap<>(),new HashMap<>()); + + runContextManager.contextRegister(context); + SerialDispatcher serialDispatcher = new SerialDispatcher(context,latch); + // 给线程池进行执行 + } + latch.await(); + } } + @Override + public void run() { + // TODO 计时 + try { + dispatch(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java index 49f2a2d..ff2dc94 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java @@ -1,17 +1,43 @@ package cd.casic.ci.process.engine.dispatcher.impl; import cd.casic.ci.process.engine.dispatcher.BaseDispatcher; -import cd.casic.ci.process.engine.runContext.StageRunContext; +import cd.casic.ci.process.engine.runContext.SecondStageRunContext; import cd.casic.ci.process.process.dataObject.base.PipBaseElement; +import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; +import cd.casic.ci.process.process.dataObject.stage.PipStage; import cd.casic.ci.process.process.dataObject.task.PipTask; import java.util.List; +import java.util.concurrent.CountDownLatch; public class SerialDispatcher implements BaseDispatcher { - private StageRunContext stageRunContext; - private List itemList; + private SecondStageRunContext stageRunContext; + private List taskList; + private CountDownLatch latch; + + public SerialDispatcher(SecondStageRunContext stageRunContext, CountDownLatch latch) { + this.stageRunContext = stageRunContext; + PipBaseElement contextDef = stageRunContext.getContextDef(); + if (contextDef instanceof PipStage) { + this.taskList = ((PipStage)contextDef).getTaskValues(); + } + this.latch = latch; + } + @Override public void dispatch() { + for (PipTask pipTask : taskList) { + // TODO 注册taskContext,且发送消息至消息队列给work执行 + } + } + @Override + public void run() { + try { + dispatch(); + latch.countDown(); + } catch (Exception e) { + throw new RuntimeException(e); + } } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java index 8263f10..a8e8ade 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java @@ -11,6 +11,7 @@ public enum ContextStateEnum { { add(READY); add(SUSPEND); + add(BAD_ENDING); add(STOP); } }), @@ -18,6 +19,7 @@ public enum ContextStateEnum { { add(RUNNING); add(SUSPEND); + add(BAD_ENDING); add(STOP); } }), @@ -34,6 +36,7 @@ public enum ContextStateEnum { add(INIT); add(READY); add(RUNNING); + add(BAD_ENDING); } }), STOP(-1,"停止", new HashSet<>()), diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java index cc0e919..b3a934a 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java @@ -10,10 +10,16 @@ import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; import cd.casic.ci.process.process.dataObject.stage.PipStage; import cd.casic.ci.process.process.service.pipeline.PipelineService; import cd.casic.ci.process.process.service.stage.StageService; +import cd.casic.framework.commons.exception.ServiceException; +import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; import jakarta.annotation.Resource; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; +import java.util.Collection; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -26,6 +32,9 @@ public class DefaultPipelineExecutor implements PipelineExecutor { private StageService stageService; @Resource private RunContextManager runContextManager; + @Resource + @Qualifier("pipelineExecutor") + private ThreadPoolTaskExecutor taskExecutor; @Override public PipelineRunContext execute(String pipelineId) { PipPipeline pipeline = pipelineService.getById(pipelineId); @@ -36,10 +45,12 @@ public class DefaultPipelineExecutor implements PipelineExecutor { String executeStatus = pipeline.getExecuteStatus(); // TODO 如果判断成功则查询所有的阶段信息 List mainStage = stageService.findAllFirstStagesAndChild(pipelineId); + if (CollectionUtils.isEmpty(mainStage)) { + throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到有效阶段信息"); + } PipelineRunContext pipelineRunContext = new PipelineRunContext(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>()); - runContextManager.contextRegister(pipelineRunContext); -// ParallelDispatcher parallelDispatcher = new ParallelDispatcher(); - - return null; + ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager); + taskExecutor.execute(parallelDispatcher); + return pipelineRunContext; } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/RunContextManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/RunContextManager.java index f3347c2..b0b8710 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/RunContextManager.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/RunContextManager.java @@ -12,7 +12,7 @@ public interface RunContextManager { * */ Boolean notifyPipeline(String pipelineId); /** - * 挂起流水线-预留 + * 挂起流水线-预留 TODO 根据配置判断是否存入数据库,恢复执行再加载入内存 * */ Boolean suspendPipeline(String pipelineId); /** @@ -20,11 +20,11 @@ public interface RunContextManager { * */ Boolean notifyStage(String pipelineId,String stageId); /** - * 挂起子阶段-预留 + * 挂起子阶段-预留 TODO 根据配置判断是否存入数据库,恢复执行再加载入内存 * */ Boolean suspendStage(String pipelineId,String stageId); /** - * 判断相应的context类型,放入注册Map中 + * 判断相应的context类型,放入注册Map中(自动维护父子context关系) * */ void contextRegister(BaseRunContext context); BaseRunContext getContext(String key); diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java index 65fd83e..94cacbe 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java @@ -63,10 +63,14 @@ public abstract class BaseRunContext { public void callParentChange(ContextStateEnum state){ if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) { checkChildEnd(); + } else if(ContextStateEnum.READY.equals(state)){ + checkChildReady(); + } else if(ContextStateEnum.RUNNING.equals(state)){ + checkChildRunning(); } } /** - * 查找子类是否全部完成,如果子类全部完成则父类也全部完成 + * 查找子类是否全部完成,如果子类全部完成则父类也完成 * */ public void checkChildEnd() throws ServiceException{ int result = ContextStateEnum.HAPPY_ENDING.getCode(); @@ -94,4 +98,49 @@ public abstract class BaseRunContext { throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误"); } } + /** + * 查找子类是否全部就绪,如果子类全部完成则父类也就绪 + * */ + public void checkChildReady() throws ServiceException{ + int result = ContextStateEnum.READY.getCode(); + for (Map.Entry entry : childContext.entrySet()) { + BaseRunContext child = entry.getValue(); + int state = child.getState().get(); + if (!ContextStateEnum.READY.getCode().equals(state)) { + return; + } + result&=state; + } + boolean ready = false; + if (ContextStateEnum.READY.getCode()==result) { + if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.READY)) { + this.state.compareAndExchange(state.get(),ContextStateEnum.READY.getCode()); + ready = true; + } + } + if (!ready) { + throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误"); + } + } + /** + * 查找子类是否存在开始运行的,如果有则父状态变成running + * */ + public void checkChildRunning() throws ServiceException{ + Boolean runningFlag = false; + for (Map.Entry entry : childContext.entrySet()) { + BaseRunContext child = entry.getValue(); + int state = child.getState().get(); + if (ContextStateEnum.READY.getCode().equals(state)) { + runningFlag = true; + break; + } + } + if (runningFlag) { + if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.RUNNING)) { + this.state.compareAndExchange(state.get(),ContextStateEnum.RUNNING.getCode()); + } else{ + throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误"); + } + } + } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/MainStageContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/MainStageContext.java deleted file mode 100644 index 1486cab..0000000 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/MainStageContext.java +++ /dev/null @@ -1,22 +0,0 @@ -package cd.casic.ci.process.engine.runContext; - -import cd.casic.ci.process.process.dataObject.base.PipBaseElement; - -import java.time.LocalDateTime; -import java.util.Map; - -public class MainStageContext extends BaseRunContext{ - public MainStageContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map globalVariables, Map localVariables, Map childContext) { - super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, childContext); - } - - @Override - public BaseRunContext getChildRunContext(String key) { - return null; - } - - @Override - public void putChildRunContext(String key, BaseRunContext context) { - - } -} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java index 8a4b672..3aedf6f 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java @@ -1,23 +1,22 @@ package cd.casic.ci.process.engine.runContext; -import cd.casic.ci.process.api.process.pojo.Pipeline; -import cd.casic.ci.process.engine.enums.ContextStateEnum; import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; import java.time.LocalDateTime; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; public class PipelineRunContext extends BaseRunContext{ - public PipelineRunContext(BaseRunContext parentContext,PipPipeline pipeline,Map globalVariables,Map localVariables) { - this(pipeline,parentContext,LocalDateTime.now(),pipeline.getResourceId(),pipeline.getTargetId(),pipeline.getTargetType(),globalVariables,localVariables,new ConcurrentHashMap<>()); + public PipelineRunContext(PipPipeline pipeline) { + this(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>()); + } + public PipelineRunContext(BaseRunContext parentContext, PipPipeline pipeline, Map globalVariables, Map localVariables) { + this(pipeline,parentContext,LocalDateTime.now(),pipeline.getResourceId(),pipeline.getTargetId(),pipeline.getTargetType(),globalVariables,localVariables,new ConcurrentHashMap<>()); } private PipelineRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map globalVariables, Map localVariables, Map childContext) { @@ -50,7 +49,7 @@ public class PipelineRunContext extends BaseRunContext{ @Override public void putChildRunContext(String key, BaseRunContext context) { Map childContext = getChildContext(); - if (context instanceof StageRunContext) { + if (context instanceof SecondStageRunContext) { childContext.put(key,context); } else { throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"不支持类型"); diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/StageRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/SecondStageRunContext.java similarity index 66% rename from modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/StageRunContext.java rename to modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/SecondStageRunContext.java index f61223e..d02a623 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/StageRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/SecondStageRunContext.java @@ -1,15 +1,17 @@ package cd.casic.ci.process.engine.runContext; import cd.casic.ci.process.process.dataObject.base.PipBaseElement; +import cd.casic.ci.process.process.dataObject.stage.PipStage; import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; import java.time.LocalDateTime; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; -public class StageRunContext extends BaseRunContext{ - public StageRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map globalVariables, Map localVariables, Map childContext) { - super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, childContext); +public class SecondStageRunContext extends BaseRunContext{ + public SecondStageRunContext(PipStage contextDef, PipelineRunContext parentContext, Map globalVariables, Map localVariables) { + super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), globalVariables, localVariables, new ConcurrentHashMap<>(contextDef.getStageList().size())); } @Override diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/TaskRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/TaskRunContext.java index 89a89f7..a0118e9 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/TaskRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/TaskRunContext.java @@ -4,10 +4,6 @@ import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.task.PipTask; import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.NoArgsConstructor; import org.apache.commons.lang3.StringUtils; import java.time.LocalDateTime; @@ -15,7 +11,7 @@ import java.util.HashMap; import java.util.Map; public class TaskRunContext extends BaseRunContext{ - public TaskRunContext(PipTask contextDef, StageRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map globalVariables, Map localVariables) { + public TaskRunContext(PipTask contextDef, SecondStageRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map globalVariables, Map localVariables) { super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, new HashMap<>()); } private TaskRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map globalVariables, Map localVariables, Map childContext) { diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/pipeline/PipPipeline.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/pipeline/PipPipeline.java index 7839298..2212c6d 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/pipeline/PipPipeline.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/pipeline/PipPipeline.java @@ -1,8 +1,10 @@ package cd.casic.ci.process.process.dataObject.pipeline; +import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import lombok.Data; +import lombok.EqualsAndHashCode; import java.time.LocalDateTime;