This commit is contained in:
HopeLi 2025-05-17 17:14:02 +08:00
commit 42dc4fb51e
14 changed files with 198 additions and 55 deletions

View File

@ -0,0 +1,25 @@
package cd.casic.ci.process.engine.config;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class ExecutorConfig {
@Bean("pipelineExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("Pipeline-");
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
executor.setRejectedExecutionHandler(callerRunsPolicy);
executor.initialize(); // 必须手动触发初始化
return executor;
}
}

View File

@ -5,6 +5,6 @@ import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import java.util.List;
public interface BaseDispatcher {
void dispatch();
public interface BaseDispatcher extends Runnable{
void dispatch() throws InterruptedException;
}

View File

@ -1,26 +1,67 @@
package cd.casic.ci.process.engine.dispatcher.impl;
import cd.casic.ci.process.engine.dispatcher.BaseDispatcher;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.engine.manager.RunContextManager;
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.stage.PipStage;
import org.springframework.core.task.TaskExecutor;
import org.springframework.util.CollectionUtils;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
public class ParallelDispatcher implements BaseDispatcher {
public class ParallelDispatcher implements BaseDispatcher{
private List<PipStage> firstStageList;
private Integer stageIndex;
private PipelineRunContext context;
private PipelineRunContext pipelineRunContext;
private RunContextManager runContextManager;
public ParallelDispatcher(List<PipStage> firstStageList, PipelineRunContext context) {
public ParallelDispatcher(List<PipStage> firstStageList, PipelineRunContext context,RunContextManager contextManager) {
this.firstStageList = firstStageList;
this.context = context;
this.pipelineRunContext = context;
this.stageIndex = 0;
this.runContextManager = contextManager;
contextManager.contextRegister(context);
}
@Override
public void dispatch() {
public void dispatch() throws InterruptedException {
// 负责依次执行阶段
for (PipStage stage : firstStageList) {
List<PipStage> stageList = stage.getStageList();
if (CollectionUtils.isEmpty(stageList)) {
// 此处可以暂时记录日志
continue;
}
CountDownLatch latch = new CountDownLatch(stageList.size());
for (PipStage secondStage : stageList) {
// 二阶段下所有task是串行所以不用关心线程安全相关信息
SecondStageRunContext context = new SecondStageRunContext(secondStage,pipelineRunContext,new HashMap<>(),new HashMap<>());
runContextManager.contextRegister(context);
SerialDispatcher serialDispatcher = new SerialDispatcher(context,latch);
// 给线程池进行执行
}
latch.await();
}
}
@Override
public void run() {
// TODO 计时
try {
dispatch();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,17 +1,49 @@
package cd.casic.ci.process.engine.dispatcher.impl;
import cd.casic.ci.process.engine.dispatcher.BaseDispatcher;
import cd.casic.ci.process.engine.runContext.StageRunContext;
import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.dataObject.stage.PipStage;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class SerialDispatcher implements BaseDispatcher {
private StageRunContext stageRunContext;
private List<PipTask> itemList;
private SecondStageRunContext stageRunContext;
private List<PipTask> taskList;
private CountDownLatch latch;
private String triggerModel;
public SerialDispatcher(SecondStageRunContext stageRunContext, CountDownLatch latch) {
this.stageRunContext = stageRunContext;
PipBaseElement contextDef = stageRunContext.getContextDef();
if (contextDef instanceof PipStage secondStage) {
this.triggerModel = secondStage.getTriggerMode();
this.taskList = secondStage.getTaskValues();
}
this.latch = latch;
}
@Override
public void dispatch() {
for (PipTask pipTask : taskList) {
// TODO 注册taskContext且发送消息至消息队列给work执行
}
}
@Override
public void run() {
try {
// TODO 检测触发方式如果需要手动触发挂起当前stage等待父级执行相应操作
// TODO 看看需要内存入库还是忽略掉当前执行进行入库countDown放行
// stageRunContext.callParentChange();
dispatch();
latch.countDown();
} catch (Exception e) {
// throw new RuntimeException(e);
// TODO 当前stage标记为失败等待父context发现并处理
}
}
}

View File

@ -11,6 +11,7 @@ public enum ContextStateEnum {
{
add(READY);
add(SUSPEND);
add(BAD_ENDING);
add(STOP);
}
}),
@ -18,6 +19,7 @@ public enum ContextStateEnum {
{
add(RUNNING);
add(SUSPEND);
add(BAD_ENDING);
add(STOP);
}
}),
@ -34,6 +36,7 @@ public enum ContextStateEnum {
add(INIT);
add(READY);
add(RUNNING);
add(BAD_ENDING);
}
}),
STOP(-1,"停止", new HashSet<>()),

View File

@ -10,10 +10,16 @@ import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.dataObject.stage.PipStage;
import cd.casic.ci.process.process.service.pipeline.PipelineService;
import cd.casic.ci.process.process.service.stage.StageService;
import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import jakarta.annotation.Resource;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@ -26,6 +32,9 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
private StageService stageService;
@Resource
private RunContextManager runContextManager;
@Resource
@Qualifier("pipelineExecutor")
private ThreadPoolTaskExecutor taskExecutor;
@Override
public PipelineRunContext execute(String pipelineId) {
PipPipeline pipeline = pipelineService.getById(pipelineId);
@ -36,10 +45,12 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
String executeStatus = pipeline.getExecuteStatus();
// TODO 如果判断成功则查询所有的阶段信息
List<PipStage> mainStage = stageService.findAllFirstStagesAndChild(pipelineId);
if (CollectionUtils.isEmpty(mainStage)) {
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到有效阶段信息");
}
PipelineRunContext pipelineRunContext = new PipelineRunContext(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>());
runContextManager.contextRegister(pipelineRunContext);
// ParallelDispatcher parallelDispatcher = new ParallelDispatcher();
return null;
ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager);
taskExecutor.execute(parallelDispatcher);
return pipelineRunContext;
}
}

View File

@ -12,7 +12,7 @@ public interface RunContextManager {
* */
Boolean notifyPipeline(String pipelineId);
/**
* 挂起流水线-预留
* 挂起流水线-预留 TODO 根据配置判断是否存入数据库恢复执行再加载入内存
* */
Boolean suspendPipeline(String pipelineId);
/**
@ -20,11 +20,11 @@ public interface RunContextManager {
* */
Boolean notifyStage(String pipelineId,String stageId);
/**
* 挂起子阶段-预留
* 挂起子阶段-预留 TODO 根据配置判断是否存入数据库恢复执行再加载入内存
* */
Boolean suspendStage(String pipelineId,String stageId);
/**
* 判断相应的context类型放入注册Map中
* 判断相应的context类型放入注册Map中自动维护父子context关系
* */
void contextRegister(BaseRunContext context);
BaseRunContext getContext(String key);

View File

@ -63,10 +63,14 @@ public abstract class BaseRunContext {
public void callParentChange(ContextStateEnum state){
if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) {
checkChildEnd();
} else if(ContextStateEnum.READY.equals(state)){
checkChildReady();
} else if(ContextStateEnum.RUNNING.equals(state)){
checkChildRunning();
}
}
/**
* 查找子类是否全部完成如果子类全部完成则父类也全部完成
* 查找子类是否全部完成如果子类全部完成则父类也完成
* */
public void checkChildEnd() throws ServiceException{
int result = ContextStateEnum.HAPPY_ENDING.getCode();
@ -94,4 +98,49 @@ public abstract class BaseRunContext {
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误");
}
}
/**
* 查找子类是否全部就绪如果子类全部完成则父类也就绪
* */
public void checkChildReady() throws ServiceException{
int result = ContextStateEnum.READY.getCode();
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
BaseRunContext child = entry.getValue();
int state = child.getState().get();
if (!ContextStateEnum.READY.getCode().equals(state)) {
return;
}
result&=state;
}
boolean ready = false;
if (ContextStateEnum.READY.getCode()==result) {
if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.READY)) {
this.state.compareAndExchange(state.get(),ContextStateEnum.READY.getCode());
ready = true;
}
}
if (!ready) {
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误");
}
}
/**
* 查找子类是否存在开始运行的如果有则父状态变成running
* */
public void checkChildRunning() throws ServiceException{
Boolean runningFlag = false;
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
BaseRunContext child = entry.getValue();
int state = child.getState().get();
if (ContextStateEnum.READY.getCode().equals(state)) {
runningFlag = true;
break;
}
}
if (runningFlag) {
if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.RUNNING)) {
this.state.compareAndExchange(state.get(),ContextStateEnum.RUNNING.getCode());
} else{
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误");
}
}
}
}

View File

@ -1,22 +0,0 @@
package cd.casic.ci.process.engine.runContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import java.time.LocalDateTime;
import java.util.Map;
public class MainStageContext extends BaseRunContext{
public MainStageContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) {
super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, childContext);
}
@Override
public BaseRunContext getChildRunContext(String key) {
return null;
}
@Override
public void putChildRunContext(String key, BaseRunContext context) {
}
}

View File

@ -1,23 +1,22 @@
package cd.casic.ci.process.engine.runContext;
import cd.casic.ci.process.api.process.pojo.Pipeline;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class PipelineRunContext extends BaseRunContext{
public PipelineRunContext(BaseRunContext parentContext,PipPipeline pipeline,Map<String, Object> globalVariables,Map<String, Object> localVariables) {
this(pipeline,parentContext,LocalDateTime.now(),pipeline.getResourceId(),pipeline.getTargetId(),pipeline.getTargetType(),globalVariables,localVariables,new ConcurrentHashMap<>());
public PipelineRunContext(PipPipeline pipeline) {
this(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>());
}
public PipelineRunContext(BaseRunContext parentContext, PipPipeline pipeline, Map<String, Object> globalVariables, Map<String, Object> localVariables) {
this(pipeline,parentContext,LocalDateTime.now(),pipeline.getResourceId(),pipeline.getTargetId(),pipeline.getTargetType(),globalVariables,localVariables,new ConcurrentHashMap<>());
}
private PipelineRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) {
@ -50,7 +49,7 @@ public class PipelineRunContext extends BaseRunContext{
@Override
public void putChildRunContext(String key, BaseRunContext context) {
Map<String, BaseRunContext> childContext = getChildContext();
if (context instanceof StageRunContext) {
if (context instanceof SecondStageRunContext) {
childContext.put(key,context);
} else {
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"不支持类型");

View File

@ -1,15 +1,17 @@
package cd.casic.ci.process.engine.runContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.stage.PipStage;
import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class StageRunContext extends BaseRunContext{
public StageRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) {
super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, childContext);
public class SecondStageRunContext extends BaseRunContext{
public SecondStageRunContext(PipStage contextDef, PipelineRunContext parentContext, Map<String, Object> globalVariables, Map<String, Object> localVariables) {
super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), globalVariables, localVariables, new ConcurrentHashMap<>(contextDef.getStageList().size()));
}
@Override

View File

@ -4,10 +4,6 @@ import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import java.time.LocalDateTime;
@ -15,7 +11,7 @@ import java.util.HashMap;
import java.util.Map;
public class TaskRunContext extends BaseRunContext{
public TaskRunContext(PipTask contextDef, StageRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables) {
public TaskRunContext(PipTask contextDef, SecondStageRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables) {
super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, new HashMap<>());
}
private TaskRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) {

View File

@ -1,5 +1,8 @@
package cd.casic.ci.process.process.dataObject.pipeline;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import lombok.Data;
import lombok.EqualsAndHashCode;

View File

@ -27,6 +27,10 @@ public class PipStage extends PipBaseElement {
//@ApiProperty(name = "code",desc="是否是源码")
private Boolean code = false;
/**
* 触发方式 0-手动触发/1-自动触发
* */
private String triggerMode;
//@ApiProperty(name = "taskValues",desc="阶段任务")
@TableField(exist = false)