执行引擎部分代码(未完成)

This commit is contained in:
even 2025-05-17 00:56:49 +08:00
parent 734f44dfd3
commit ddf3ff7069
13 changed files with 187 additions and 55 deletions

View File

@ -0,0 +1,25 @@
package cd.casic.ci.process.engine.config;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class ExecutorConfig {
@Bean("pipelineExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("Pipeline-");
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
executor.setRejectedExecutionHandler(callerRunsPolicy);
executor.initialize(); // 必须手动触发初始化
return executor;
}
}

View File

@ -5,6 +5,6 @@ import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import java.util.List; import java.util.List;
public interface BaseDispatcher { public interface BaseDispatcher extends Runnable{
void dispatch(); void dispatch() throws InterruptedException;
} }

View File

@ -1,26 +1,67 @@
package cd.casic.ci.process.engine.dispatcher.impl; package cd.casic.ci.process.engine.dispatcher.impl;
import cd.casic.ci.process.engine.dispatcher.BaseDispatcher; import cd.casic.ci.process.engine.dispatcher.BaseDispatcher;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.engine.manager.RunContextManager;
import cd.casic.ci.process.engine.runContext.PipelineRunContext; import cd.casic.ci.process.engine.runContext.PipelineRunContext;
import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.stage.PipStage; import cd.casic.ci.process.process.dataObject.stage.PipStage;
import org.springframework.core.task.TaskExecutor;
import org.springframework.util.CollectionUtils;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
public class ParallelDispatcher implements BaseDispatcher { public class ParallelDispatcher implements BaseDispatcher{
private List<PipStage> firstStageList; private List<PipStage> firstStageList;
private Integer stageIndex; private Integer stageIndex;
private PipelineRunContext context; private PipelineRunContext pipelineRunContext;
private RunContextManager runContextManager;
public ParallelDispatcher(List<PipStage> firstStageList, PipelineRunContext context) { public ParallelDispatcher(List<PipStage> firstStageList, PipelineRunContext context,RunContextManager contextManager) {
this.firstStageList = firstStageList; this.firstStageList = firstStageList;
this.context = context; this.pipelineRunContext = context;
this.stageIndex = 0; this.stageIndex = 0;
this.runContextManager = contextManager;
contextManager.contextRegister(context);
} }
@Override @Override
public void dispatch() { public void dispatch() throws InterruptedException {
// 负责依次执行阶段
for (PipStage stage : firstStageList) {
List<PipStage> stageList = stage.getStageList();
if (CollectionUtils.isEmpty(stageList)) {
// 此处可以暂时记录日志
continue;
}
CountDownLatch latch = new CountDownLatch(stageList.size());
for (PipStage secondStage : stageList) {
// 二阶段下所有task是串行所以不用关心线程安全相关信息
SecondStageRunContext context = new SecondStageRunContext(secondStage,pipelineRunContext,new HashMap<>(),new HashMap<>());
runContextManager.contextRegister(context);
SerialDispatcher serialDispatcher = new SerialDispatcher(context,latch);
// 给线程池进行执行
}
latch.await();
}
} }
@Override
public void run() {
// TODO 计时
try {
dispatch();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
} }

View File

@ -1,17 +1,43 @@
package cd.casic.ci.process.engine.dispatcher.impl; package cd.casic.ci.process.engine.dispatcher.impl;
import cd.casic.ci.process.engine.dispatcher.BaseDispatcher; import cd.casic.ci.process.engine.dispatcher.BaseDispatcher;
import cd.casic.ci.process.engine.runContext.StageRunContext; import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.dataObject.stage.PipStage;
import cd.casic.ci.process.process.dataObject.task.PipTask; import cd.casic.ci.process.process.dataObject.task.PipTask;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch;
public class SerialDispatcher implements BaseDispatcher { public class SerialDispatcher implements BaseDispatcher {
private StageRunContext stageRunContext; private SecondStageRunContext stageRunContext;
private List<PipTask> itemList; private List<PipTask> taskList;
private CountDownLatch latch;
public SerialDispatcher(SecondStageRunContext stageRunContext, CountDownLatch latch) {
this.stageRunContext = stageRunContext;
PipBaseElement contextDef = stageRunContext.getContextDef();
if (contextDef instanceof PipStage) {
this.taskList = ((PipStage)contextDef).getTaskValues();
}
this.latch = latch;
}
@Override @Override
public void dispatch() { public void dispatch() {
for (PipTask pipTask : taskList) {
// TODO 注册taskContext且发送消息至消息队列给work执行
}
}
@Override
public void run() {
try {
dispatch();
latch.countDown();
} catch (Exception e) {
throw new RuntimeException(e);
}
} }
} }

View File

@ -11,6 +11,7 @@ public enum ContextStateEnum {
{ {
add(READY); add(READY);
add(SUSPEND); add(SUSPEND);
add(BAD_ENDING);
add(STOP); add(STOP);
} }
}), }),
@ -18,6 +19,7 @@ public enum ContextStateEnum {
{ {
add(RUNNING); add(RUNNING);
add(SUSPEND); add(SUSPEND);
add(BAD_ENDING);
add(STOP); add(STOP);
} }
}), }),
@ -34,6 +36,7 @@ public enum ContextStateEnum {
add(INIT); add(INIT);
add(READY); add(READY);
add(RUNNING); add(RUNNING);
add(BAD_ENDING);
} }
}), }),
STOP(-1,"停止", new HashSet<>()), STOP(-1,"停止", new HashSet<>()),

View File

@ -10,10 +10,16 @@ import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.dataObject.stage.PipStage; import cd.casic.ci.process.process.dataObject.stage.PipStage;
import cd.casic.ci.process.process.service.pipeline.PipelineService; import cd.casic.ci.process.process.service.pipeline.PipelineService;
import cd.casic.ci.process.process.service.stage.StageService; import cd.casic.ci.process.process.service.stage.StageService;
import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -26,6 +32,9 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
private StageService stageService; private StageService stageService;
@Resource @Resource
private RunContextManager runContextManager; private RunContextManager runContextManager;
@Resource
@Qualifier("pipelineExecutor")
private ThreadPoolTaskExecutor taskExecutor;
@Override @Override
public PipelineRunContext execute(String pipelineId) { public PipelineRunContext execute(String pipelineId) {
PipPipeline pipeline = pipelineService.getById(pipelineId); PipPipeline pipeline = pipelineService.getById(pipelineId);
@ -36,10 +45,12 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
String executeStatus = pipeline.getExecuteStatus(); String executeStatus = pipeline.getExecuteStatus();
// TODO 如果判断成功则查询所有的阶段信息 // TODO 如果判断成功则查询所有的阶段信息
List<PipStage> mainStage = stageService.findAllFirstStagesAndChild(pipelineId); List<PipStage> mainStage = stageService.findAllFirstStagesAndChild(pipelineId);
if (CollectionUtils.isEmpty(mainStage)) {
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到有效阶段信息");
}
PipelineRunContext pipelineRunContext = new PipelineRunContext(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>()); PipelineRunContext pipelineRunContext = new PipelineRunContext(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>());
runContextManager.contextRegister(pipelineRunContext); ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager);
// ParallelDispatcher parallelDispatcher = new ParallelDispatcher(); taskExecutor.execute(parallelDispatcher);
return pipelineRunContext;
return null;
} }
} }

View File

@ -12,7 +12,7 @@ public interface RunContextManager {
* */ * */
Boolean notifyPipeline(String pipelineId); Boolean notifyPipeline(String pipelineId);
/** /**
* 挂起流水线-预留 * 挂起流水线-预留 TODO 根据配置判断是否存入数据库恢复执行再加载入内存
* */ * */
Boolean suspendPipeline(String pipelineId); Boolean suspendPipeline(String pipelineId);
/** /**
@ -20,11 +20,11 @@ public interface RunContextManager {
* */ * */
Boolean notifyStage(String pipelineId,String stageId); Boolean notifyStage(String pipelineId,String stageId);
/** /**
* 挂起子阶段-预留 * 挂起子阶段-预留 TODO 根据配置判断是否存入数据库恢复执行再加载入内存
* */ * */
Boolean suspendStage(String pipelineId,String stageId); Boolean suspendStage(String pipelineId,String stageId);
/** /**
* 判断相应的context类型放入注册Map中 * 判断相应的context类型放入注册Map中自动维护父子context关系
* */ * */
void contextRegister(BaseRunContext context); void contextRegister(BaseRunContext context);
BaseRunContext getContext(String key); BaseRunContext getContext(String key);

View File

@ -63,10 +63,14 @@ public abstract class BaseRunContext {
public void callParentChange(ContextStateEnum state){ public void callParentChange(ContextStateEnum state){
if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) { if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) {
checkChildEnd(); checkChildEnd();
} else if(ContextStateEnum.READY.equals(state)){
checkChildReady();
} else if(ContextStateEnum.RUNNING.equals(state)){
checkChildRunning();
} }
} }
/** /**
* 查找子类是否全部完成如果子类全部完成则父类也全部完成 * 查找子类是否全部完成如果子类全部完成则父类也完成
* */ * */
public void checkChildEnd() throws ServiceException{ public void checkChildEnd() throws ServiceException{
int result = ContextStateEnum.HAPPY_ENDING.getCode(); int result = ContextStateEnum.HAPPY_ENDING.getCode();
@ -94,4 +98,49 @@ public abstract class BaseRunContext {
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误"); throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误");
} }
} }
/**
* 查找子类是否全部就绪如果子类全部完成则父类也就绪
* */
public void checkChildReady() throws ServiceException{
int result = ContextStateEnum.READY.getCode();
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
BaseRunContext child = entry.getValue();
int state = child.getState().get();
if (!ContextStateEnum.READY.getCode().equals(state)) {
return;
}
result&=state;
}
boolean ready = false;
if (ContextStateEnum.READY.getCode()==result) {
if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.READY)) {
this.state.compareAndExchange(state.get(),ContextStateEnum.READY.getCode());
ready = true;
}
}
if (!ready) {
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误");
}
}
/**
* 查找子类是否存在开始运行的如果有则父状态变成running
* */
public void checkChildRunning() throws ServiceException{
Boolean runningFlag = false;
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
BaseRunContext child = entry.getValue();
int state = child.getState().get();
if (ContextStateEnum.READY.getCode().equals(state)) {
runningFlag = true;
break;
}
}
if (runningFlag) {
if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.RUNNING)) {
this.state.compareAndExchange(state.get(),ContextStateEnum.RUNNING.getCode());
} else{
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误");
}
}
}
} }

View File

@ -1,22 +0,0 @@
package cd.casic.ci.process.engine.runContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import java.time.LocalDateTime;
import java.util.Map;
public class MainStageContext extends BaseRunContext{
public MainStageContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) {
super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, childContext);
}
@Override
public BaseRunContext getChildRunContext(String key) {
return null;
}
@Override
public void putChildRunContext(String key, BaseRunContext context) {
}
}

View File

@ -1,23 +1,22 @@
package cd.casic.ci.process.engine.runContext; package cd.casic.ci.process.engine.runContext;
import cd.casic.ci.process.api.process.pojo.Pipeline;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class PipelineRunContext extends BaseRunContext{ public class PipelineRunContext extends BaseRunContext{
public PipelineRunContext(BaseRunContext parentContext,PipPipeline pipeline,Map<String, Object> globalVariables,Map<String, Object> localVariables) { public PipelineRunContext(PipPipeline pipeline) {
this(pipeline,parentContext,LocalDateTime.now(),pipeline.getResourceId(),pipeline.getTargetId(),pipeline.getTargetType(),globalVariables,localVariables,new ConcurrentHashMap<>()); this(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>());
}
public PipelineRunContext(BaseRunContext parentContext, PipPipeline pipeline, Map<String, Object> globalVariables, Map<String, Object> localVariables) {
this(pipeline,parentContext,LocalDateTime.now(),pipeline.getResourceId(),pipeline.getTargetId(),pipeline.getTargetType(),globalVariables,localVariables,new ConcurrentHashMap<>());
} }
private PipelineRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) { private PipelineRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) {
@ -50,7 +49,7 @@ public class PipelineRunContext extends BaseRunContext{
@Override @Override
public void putChildRunContext(String key, BaseRunContext context) { public void putChildRunContext(String key, BaseRunContext context) {
Map<String, BaseRunContext> childContext = getChildContext(); Map<String, BaseRunContext> childContext = getChildContext();
if (context instanceof StageRunContext) { if (context instanceof SecondStageRunContext) {
childContext.put(key,context); childContext.put(key,context);
} else { } else {
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"不支持类型"); throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"不支持类型");

View File

@ -1,15 +1,17 @@
package cd.casic.ci.process.engine.runContext; package cd.casic.ci.process.engine.runContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.stage.PipStage;
import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class StageRunContext extends BaseRunContext{ public class SecondStageRunContext extends BaseRunContext{
public StageRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) { public SecondStageRunContext(PipStage contextDef, PipelineRunContext parentContext, Map<String, Object> globalVariables, Map<String, Object> localVariables) {
super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, childContext); super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), globalVariables, localVariables, new ConcurrentHashMap<>(contextDef.getStageList().size()));
} }
@Override @Override

View File

@ -4,10 +4,6 @@ import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.task.PipTask; import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@ -15,7 +11,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
public class TaskRunContext extends BaseRunContext{ public class TaskRunContext extends BaseRunContext{
public TaskRunContext(PipTask contextDef, StageRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables) { public TaskRunContext(PipTask contextDef, SecondStageRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables) {
super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, new HashMap<>()); super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, new HashMap<>());
} }
private TaskRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) { private TaskRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) {

View File

@ -1,8 +1,10 @@
package cd.casic.ci.process.process.dataObject.pipeline; package cd.casic.ci.process.process.dataObject.pipeline;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableId;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode;
import java.time.LocalDateTime; import java.time.LocalDateTime;