执行引擎部分逻辑,修改redis必须使用5以上版本()

This commit is contained in:
even 2025-05-17 16:11:35 +08:00
parent 57a6f96138
commit 196162cf99
12 changed files with 144 additions and 20 deletions

View File

@ -11,7 +11,7 @@ import java.util.concurrent.ThreadPoolExecutor;
@Configuration @Configuration
public class ExecutorConfig { public class ExecutorConfig {
@Bean("pipelineExecutor") @Bean("pipelineExecutor")
public ThreadPoolTaskExecutor taskExecutor() { public ThreadPoolTaskExecutor pipelineExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5); executor.setCorePoolSize(5);
executor.setMaxPoolSize(10); executor.setMaxPoolSize(10);
@ -22,4 +22,16 @@ public class ExecutorConfig {
executor.initialize(); // 必须手动触发初始化 executor.initialize(); // 必须手动触发初始化
return executor; return executor;
} }
@Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("Task-");
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
executor.setRejectedExecutionHandler(callerRunsPolicy);
executor.initialize(); // 必须手动触发初始化
return executor;
}
} }

View File

@ -7,7 +7,9 @@ import cd.casic.ci.process.engine.runContext.PipelineRunContext;
import cd.casic.ci.process.engine.runContext.SecondStageRunContext; import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.stage.PipStage; import cd.casic.ci.process.process.dataObject.stage.PipStage;
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
import org.springframework.core.task.TaskExecutor; import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.util.Collection; import java.util.Collection;
@ -24,13 +26,17 @@ public class ParallelDispatcher implements BaseDispatcher{
private Integer stageIndex; private Integer stageIndex;
private PipelineRunContext pipelineRunContext; private PipelineRunContext pipelineRunContext;
private RunContextManager runContextManager; private RunContextManager runContextManager;
private RedisMQTemplate redisMQTemplate;
private ThreadPoolTaskExecutor taskExecutor;
public ParallelDispatcher(List<PipStage> firstStageList, PipelineRunContext context,RunContextManager contextManager) { public ParallelDispatcher(List<PipStage> firstStageList, PipelineRunContext context,RunContextManager contextManager,RedisMQTemplate redisMQTemplate,ThreadPoolTaskExecutor taskExecutor) {
this.firstStageList = firstStageList; this.firstStageList = firstStageList;
this.pipelineRunContext = context; this.pipelineRunContext = context;
this.stageIndex = 0; this.stageIndex = 0;
this.runContextManager = contextManager; this.runContextManager = contextManager;
contextManager.contextRegister(context); contextManager.contextRegister(context);
this.redisMQTemplate = redisMQTemplate;
this.taskExecutor = taskExecutor;
} }
@Override @Override
@ -45,11 +51,12 @@ public class ParallelDispatcher implements BaseDispatcher{
CountDownLatch latch = new CountDownLatch(stageList.size()); CountDownLatch latch = new CountDownLatch(stageList.size());
for (PipStage secondStage : stageList) { for (PipStage secondStage : stageList) {
// 二阶段下所有task是串行所以不用关心线程安全相关信息 // 二阶段下所有task是串行所以不用关心线程安全相关信息
SecondStageRunContext context = new SecondStageRunContext(secondStage,pipelineRunContext,new HashMap<>(),new HashMap<>()); SecondStageRunContext context = new SecondStageRunContext(secondStage,pipelineRunContext,new ConcurrentHashMap<>());
runContextManager.contextRegister(context); runContextManager.contextRegister(context);
SerialDispatcher serialDispatcher = new SerialDispatcher(context,latch); SerialDispatcher serialDispatcher = new SerialDispatcher(context,latch,runContextManager,redisMQTemplate);
// 给线程池进行执行 // 给线程池进行执行
taskExecutor.execute(serialDispatcher);
} }
latch.await(); latch.await();
} }

View File

@ -1,22 +1,30 @@
package cd.casic.ci.process.engine.dispatcher.impl; package cd.casic.ci.process.engine.dispatcher.impl;
import cd.casic.ci.process.engine.dispatcher.BaseDispatcher; import cd.casic.ci.process.engine.dispatcher.BaseDispatcher;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.engine.manager.RunContextManager;
import cd.casic.ci.process.engine.message.TaskRunMessage;
import cd.casic.ci.process.engine.runContext.SecondStageRunContext; import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.dataObject.stage.PipStage; import cd.casic.ci.process.process.dataObject.stage.PipStage;
import cd.casic.ci.process.process.dataObject.task.PipTask; import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class SerialDispatcher implements BaseDispatcher { public class SerialDispatcher implements BaseDispatcher {
private SecondStageRunContext stageRunContext; private SecondStageRunContext stageRunContext;
private List<PipTask> taskList; private List<PipTask> taskList;
private CountDownLatch latch; private CountDownLatch latch;
private String triggerModel; private String triggerModel;
private RedisMQTemplate redisMQTemplate;
private RunContextManager contextManager;
public SerialDispatcher(SecondStageRunContext stageRunContext, CountDownLatch latch) { public SerialDispatcher(SecondStageRunContext stageRunContext, CountDownLatch latch,RunContextManager contextManager,RedisMQTemplate redisMQTemplate) {
this.stageRunContext = stageRunContext; this.stageRunContext = stageRunContext;
PipBaseElement contextDef = stageRunContext.getContextDef(); PipBaseElement contextDef = stageRunContext.getContextDef();
if (contextDef instanceof PipStage secondStage) { if (contextDef instanceof PipStage secondStage) {
@ -24,12 +32,19 @@ public class SerialDispatcher implements BaseDispatcher {
this.taskList = secondStage.getTaskValues(); this.taskList = secondStage.getTaskValues();
} }
this.latch = latch; this.latch = latch;
this.redisMQTemplate = redisMQTemplate;
this.contextManager = contextManager;
} }
@Override @Override
public void dispatch() { public void dispatch() {
for (PipTask pipTask : taskList) { for (PipTask pipTask : taskList) {
// TODO 注册taskContext且发送消息至消息队列给work执行 // 注册taskContext且发送消息至消息队列给work执行
TaskRunContext taskRunContext = new TaskRunContext(pipTask,stageRunContext);
contextManager.contextRegister(taskRunContext);
taskRunContext.setState(new AtomicInteger(ContextStateEnum.READY.getCode()));
TaskRunMessage taskRunMessage = new TaskRunMessage(pipTask);
redisMQTemplate.send(taskRunMessage);
} }
} }
@ -40,10 +55,10 @@ public class SerialDispatcher implements BaseDispatcher {
// TODO 看看需要内存入库还是忽略掉当前执行进行入库countDown放行 // TODO 看看需要内存入库还是忽略掉当前执行进行入库countDown放行
// stageRunContext.callParentChange(); // stageRunContext.callParentChange();
dispatch(); dispatch();
latch.countDown();
} catch (Exception e) { } catch (Exception e) {
// throw new RuntimeException(e); // throw new RuntimeException(e);
// TODO 当前stage标记为失败等待父context发现并处理 // TODO 当前stage标记为失败等待父context发现并处理
} }
latch.countDown();
} }
} }

View File

@ -12,6 +12,7 @@ import cd.casic.ci.process.process.service.pipeline.PipelineService;
import cd.casic.ci.process.process.service.stage.StageService; import cd.casic.ci.process.process.service.stage.StageService;
import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@ -34,7 +35,12 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
private RunContextManager runContextManager; private RunContextManager runContextManager;
@Resource @Resource
@Qualifier("pipelineExecutor") @Qualifier("pipelineExecutor")
private ThreadPoolTaskExecutor pipelineExecutor;
@Resource
@Qualifier("taskExecutor")
private ThreadPoolTaskExecutor taskExecutor; private ThreadPoolTaskExecutor taskExecutor;
@Resource
private RedisMQTemplate redisMQTemplate;
@Override @Override
public PipelineRunContext execute(String pipelineId) { public PipelineRunContext execute(String pipelineId) {
PipPipeline pipeline = pipelineService.getById(pipelineId); PipPipeline pipeline = pipelineService.getById(pipelineId);
@ -49,7 +55,7 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到有效阶段信息"); throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到有效阶段信息");
} }
PipelineRunContext pipelineRunContext = new PipelineRunContext(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>()); PipelineRunContext pipelineRunContext = new PipelineRunContext(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>());
ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager); ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager,redisMQTemplate,taskExecutor);
taskExecutor.execute(parallelDispatcher); taskExecutor.execute(parallelDispatcher);
return pipelineRunContext; return pipelineRunContext;
} }

View File

@ -1,4 +1,6 @@
package cd.casic.ci.process.engine.manager; package cd.casic.ci.process.engine.manager;
/**
* 负责监听队列找到ContextManager获取runContext然后实际执行
* */
public interface WorkerManager { public interface WorkerManager {
} }

View File

@ -2,10 +2,20 @@ package cd.casic.ci.process.engine.manager.impl;
import cd.casic.ci.process.engine.manager.RunContextManager; import cd.casic.ci.process.engine.manager.RunContextManager;
import cd.casic.ci.process.engine.runContext.BaseRunContext; import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component @Component
public class DefaultRunContextManager implements RunContextManager { public class DefaultRunContextManager implements RunContextManager {
private final Map<String,PipelineRunContext> contextMap= new ConcurrentHashMap();
@Override @Override
public Boolean stopPipeline(String pipelineId) { public Boolean stopPipeline(String pipelineId) {
return null; return null;
@ -28,16 +38,50 @@ public class DefaultRunContextManager implements RunContextManager {
@Override @Override
public Boolean suspendStage(String pipelineId, String stageId) { public Boolean suspendStage(String pipelineId, String stageId) {
PipelineRunContext pipelineRunContext = contextMap.get(pipelineId);
if (pipelineRunContext !=null) {
}
return null; return null;
} }
@Override @Override
public void contextRegister(BaseRunContext context) { public void contextRegister(BaseRunContext context) {
PipBaseElement contextDef = context.getContextDef();
String id = contextDef.getId();
BaseRunContext parentContext = context.getParentContext();
if (context instanceof PipelineRunContext) {
contextMap.put(id,(PipelineRunContext)context);
} else {
if (parentContext==null) {
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"注册context失败");
}
PipBaseElement parentContextDef = parentContext.getContextDef();
BaseRunContext parentRunContext = null;
for (Map.Entry<String, PipelineRunContext> entry : contextMap.entrySet()) {
PipelineRunContext value = entry.getValue();
BaseRunContext runContext = value.getRunContext(parentContextDef.getId());
if (runContext!=null) {
parentRunContext = runContext;
break;
}
}
if (parentRunContext==null) {
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到父级context");
}
parentRunContext.putChildRunContext(id,context);
}
} }
@Override @Override
public BaseRunContext getContext(String key) { public BaseRunContext getContext(String key) {
for (Map.Entry<String, PipelineRunContext> entry : contextMap.entrySet()) {
PipelineRunContext value = entry.getValue();
BaseRunContext runContext = value.getRunContext(key);
if (runContext!=null) {
return runContext;
}
}
return null; return null;
} }
} }

View File

@ -0,0 +1,15 @@
package cd.casic.ci.process.engine.manager.impl;
import cd.casic.ci.process.engine.message.TaskRunMessage;
import cd.casic.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class DefaultWorkerManager extends AbstractRedisStreamMessageListener<TaskRunMessage> {
@Override
public void onMessage(TaskRunMessage message) {
log.info("===============接收到消息================");
}
}

View File

@ -0,0 +1,20 @@
package cd.casic.ci.process.engine.message;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
import cd.casic.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
public class TaskRunMessage extends AbstractRedisStreamMessage {
private PipTask task;
public TaskRunMessage(PipTask task) {
this.task = task;
}
}

View File

@ -58,7 +58,7 @@ public abstract class BaseRunContext {
/** /**
* 获取当前或者子上下文 * 获取当前或者子上下文
* */ * */
public abstract BaseRunContext getChildRunContext(String key); public abstract BaseRunContext getRunContext(String key);
public abstract void putChildRunContext(String key,BaseRunContext context); public abstract void putChildRunContext(String key,BaseRunContext context);
public void callParentChange(ContextStateEnum state){ public void callParentChange(ContextStateEnum state){
if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) { if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) {

View File

@ -35,10 +35,10 @@ public class PipelineRunContext extends BaseRunContext{
* pipeline 底下有多个阶段多个阶段包含多个task 不保存第二级context * pipeline 底下有多个阶段多个阶段包含多个task 不保存第二级context
* */ * */
@Override @Override
public BaseRunContext getChildRunContext(String stageId) { public BaseRunContext getRunContext(String stageId) {
Map<String, BaseRunContext> childContext = getChildContext(); Map<String, BaseRunContext> childContext = getChildContext();
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) { for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
BaseRunContext childRunContext = entry.getValue().getChildRunContext(stageId); BaseRunContext childRunContext = entry.getValue().getRunContext(stageId);
if (childRunContext!=null) { if (childRunContext!=null) {
return childRunContext; return childRunContext;
} }

View File

@ -10,15 +10,18 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
public class SecondStageRunContext extends BaseRunContext{ public class SecondStageRunContext extends BaseRunContext{
public SecondStageRunContext(PipStage contextDef, PipelineRunContext parentContext, Map<String, Object> globalVariables, Map<String, Object> localVariables) { public SecondStageRunContext(PipStage contextDef, PipelineRunContext parentContext, Map<String, Object> localVariables) {
super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), globalVariables, localVariables, new ConcurrentHashMap<>(contextDef.getStageList().size())); super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), parentContext.getGlobalVariables(), localVariables, new ConcurrentHashMap<>(contextDef.getStageList().size()));
} }
@Override @Override
public BaseRunContext getChildRunContext(String taskId) { public BaseRunContext getRunContext(String taskId) {
if (this.getContextDef().getId().equals(taskId)) {
return this;
}
Map<String, BaseRunContext> childContext = getChildContext(); Map<String, BaseRunContext> childContext = getChildContext();
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) { for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
BaseRunContext childRunContext = entry.getValue().getChildRunContext(taskId); BaseRunContext childRunContext = entry.getValue().getRunContext(taskId);
if (childRunContext!=null) { if (childRunContext!=null) {
return childRunContext; return childRunContext;
} }

View File

@ -11,8 +11,8 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
public class TaskRunContext extends BaseRunContext{ public class TaskRunContext extends BaseRunContext{
public TaskRunContext(PipTask contextDef, SecondStageRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables) { public TaskRunContext(PipTask contextDef, SecondStageRunContext parentContext) {
super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, new HashMap<>()); super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), parentContext.getGlobalVariables(), new HashMap<>(), new HashMap<>());
} }
private TaskRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) { private TaskRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) {
super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, childContext); super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, childContext);
@ -22,7 +22,7 @@ public class TaskRunContext extends BaseRunContext{
* task是最后一层没有下一级所以如果id相同直接返回它自己 * task是最后一层没有下一级所以如果id相同直接返回它自己
* */ * */
@Override @Override
public BaseRunContext getChildRunContext(String id) { public BaseRunContext getRunContext(String id) {
if (!StringUtils.isEmpty(id)||!id.equals(this.getContextDef().getId())) { if (!StringUtils.isEmpty(id)||!id.equals(this.getContextDef().getId())) {
return null; return null;
} }