执行引擎部分逻辑,修改redis必须使用5以上版本()

This commit is contained in:
even 2025-05-17 17:28:40 +08:00
parent 196162cf99
commit e2affe4487
10 changed files with 192 additions and 38 deletions

View File

@ -1,5 +1,7 @@
package cd.casic.ci.common.pipeline.annotation; package cd.casic.ci.common.pipeline.annotation;
import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Indexed; import org.springframework.stereotype.Indexed;
import java.lang.annotation.*; import java.lang.annotation.*;
@ -8,5 +10,10 @@ import java.lang.annotation.*;
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@Documented @Documented
@Indexed @Indexed
@Component
public @interface Plugin { public @interface Plugin {
@AliasFor("value")
String taskType() default "";
@AliasFor("taskType")
String value() default "";
} }

View File

@ -10,25 +10,37 @@ import java.util.concurrent.ThreadPoolExecutor;
@Configuration @Configuration
public class ExecutorConfig { public class ExecutorConfig {
@Bean("pipelineExecutor") @Bean("parallelExecutor")
public ThreadPoolTaskExecutor pipelineExecutor() { public ThreadPoolTaskExecutor pipelineExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5); executor.setCorePoolSize(5);
executor.setMaxPoolSize(10); executor.setMaxPoolSize(10);
executor.setQueueCapacity(100); executor.setQueueCapacity(100);
executor.setThreadNamePrefix("Pipeline-"); executor.setThreadNamePrefix("Parallel-");
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy(); ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
executor.setRejectedExecutionHandler(callerRunsPolicy); executor.setRejectedExecutionHandler(callerRunsPolicy);
executor.initialize(); // 必须手动触发初始化 executor.initialize(); // 必须手动触发初始化
return executor; return executor;
} }
@Bean("taskExecutor") @Bean("serialExecutor")
public ThreadPoolTaskExecutor taskExecutor() { public ThreadPoolTaskExecutor serialExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5); executor.setCorePoolSize(5);
executor.setMaxPoolSize(10); executor.setMaxPoolSize(10);
executor.setQueueCapacity(100); executor.setQueueCapacity(100);
executor.setThreadNamePrefix("Task-"); executor.setThreadNamePrefix("Serial-");
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
executor.setRejectedExecutionHandler(callerRunsPolicy);
executor.initialize(); // 必须手动触发初始化
return executor;
}
@Bean("workerExecutor")
public ThreadPoolTaskExecutor workerExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("Worker-");
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy(); ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
executor.setRejectedExecutionHandler(callerRunsPolicy); executor.setRejectedExecutionHandler(callerRunsPolicy);
executor.initialize(); // 必须手动触发初始化 executor.initialize(); // 必须手动触发初始化

View File

@ -42,7 +42,7 @@ public class SerialDispatcher implements BaseDispatcher {
// 注册taskContext且发送消息至消息队列给work执行 // 注册taskContext且发送消息至消息队列给work执行
TaskRunContext taskRunContext = new TaskRunContext(pipTask,stageRunContext); TaskRunContext taskRunContext = new TaskRunContext(pipTask,stageRunContext);
contextManager.contextRegister(taskRunContext); contextManager.contextRegister(taskRunContext);
taskRunContext.setState(new AtomicInteger(ContextStateEnum.READY.getCode())); taskRunContext.changeContextState(ContextStateEnum.READY);
TaskRunMessage taskRunMessage = new TaskRunMessage(pipTask); TaskRunMessage taskRunMessage = new TaskRunMessage(pipTask);
redisMQTemplate.send(taskRunMessage); redisMQTemplate.send(taskRunMessage);
} }

View File

@ -34,11 +34,9 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
@Resource @Resource
private RunContextManager runContextManager; private RunContextManager runContextManager;
@Resource @Resource
@Qualifier("pipelineExecutor") private ThreadPoolTaskExecutor parallelExecutor;
private ThreadPoolTaskExecutor pipelineExecutor;
@Resource @Resource
@Qualifier("taskExecutor") private ThreadPoolTaskExecutor serialExecutor;
private ThreadPoolTaskExecutor taskExecutor;
@Resource @Resource
private RedisMQTemplate redisMQTemplate; private RedisMQTemplate redisMQTemplate;
@Override @Override
@ -55,8 +53,8 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到有效阶段信息"); throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到有效阶段信息");
} }
PipelineRunContext pipelineRunContext = new PipelineRunContext(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>()); PipelineRunContext pipelineRunContext = new PipelineRunContext(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>());
ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager,redisMQTemplate,taskExecutor); ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager,redisMQTemplate,serialExecutor);
taskExecutor.execute(parallelDispatcher); parallelExecutor.execute(parallelDispatcher);
return pipelineRunContext; return pipelineRunContext;
} }
} }

View File

@ -1,15 +1,81 @@
package cd.casic.ci.process.engine.manager.impl; package cd.casic.ci.process.engine.manager.impl;
import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.message.TaskRunMessage; import cd.casic.ci.process.engine.message.TaskRunMessage;
import cd.casic.ci.process.engine.worker.BaseWorker;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener; import cd.casic.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import jodd.util.StringUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@Component @Component
@Slf4j @Slf4j
public class DefaultWorkerManager extends AbstractRedisStreamMessageListener<TaskRunMessage> { public class DefaultWorkerManager extends AbstractRedisStreamMessageListener<TaskRunMessage> implements ApplicationContextAware {
private static final String basePackage = "cd.casic.ci.process.engine.worker";
private Set<BeanDefinition> candidates;
private ApplicationContext applicationContext;
private Map<String,BaseWorker> taskTypeWorkerMap = null;
@Resource
private ThreadPoolTaskExecutor workerExecutor;
private void setTaskTypeWorker(Map<String, BaseWorker> taskTypeWorker) {
this.taskTypeWorkerMap = taskTypeWorker;
}
@PostConstruct
public void init(){
ClassPathScanningCandidateComponentProvider provider =
new ClassPathScanningCandidateComponentProvider(false);
provider.addIncludeFilter(new AnnotationTypeFilter(Plugin.class));
candidates = provider.findCandidateComponents(basePackage);
taskTypeWorkerMap = new HashMap<>(candidates.size());
for (BeanDefinition candidate : candidates) {
String beanClassName = candidate.getBeanClassName();
Class<?> workerClass = null;
try {
workerClass = Class.forName(beanClassName);
} catch (ClassNotFoundException e) {
continue;
}
Object bean = applicationContext.getBean(workerClass);
if (bean instanceof BaseWorker worker) {
Plugin annotation = worker.getClass().getAnnotation(Plugin.class);
String taskType = annotation.taskType();
if (StringUtils.isEmpty(taskType)) {
taskTypeWorkerMap.put(taskType,worker);
}
}
}
log.info("================WorkerManager初始化完毕");
}
@Override @Override
public void onMessage(TaskRunMessage message) { public void onMessage(TaskRunMessage message) {
log.info("===============接收到消息================"); log.info("===============接收到消息================");
PipTask task = message.getTask();
String taskType = task.getTaskType();
BaseWorker baseWorker = taskTypeWorkerMap.get(taskType);
baseWorker.setContextKey(task.getId());
workerExecutor.execute(baseWorker);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
} }
} }

View File

@ -54,6 +54,17 @@ public abstract class BaseRunContext {
this.localVariables = localVariables; this.localVariables = localVariables;
this.childContext = childContext; this.childContext = childContext;
} }
public void changeContextState(ContextStateEnum stateEnum){
ContextStateEnum curr = ContextStateEnum.getByCode(state.get());
if (ContextStateEnum.canGoto(curr,stateEnum)) {
state.compareAndExchange(curr.getCode(),stateEnum.getCode());
callParentChange(stateEnum);
}
}
private void setState(AtomicInteger state) {
this.state = state;
}
/** /**
* 获取当前或者子上下文 * 获取当前或者子上下文
@ -61,13 +72,17 @@ public abstract class BaseRunContext {
public abstract BaseRunContext getRunContext(String key); public abstract BaseRunContext getRunContext(String key);
public abstract void putChildRunContext(String key,BaseRunContext context); public abstract void putChildRunContext(String key,BaseRunContext context);
public void callParentChange(ContextStateEnum state){ public void callParentChange(ContextStateEnum state){
if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) { if (parentContext==null) {
checkChildEnd(); return;
} else if(ContextStateEnum.READY.equals(state)){
checkChildReady();
} else if(ContextStateEnum.RUNNING.equals(state)){
checkChildRunning();
} }
if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) {
parentContext.checkChildEnd();
} else if(ContextStateEnum.READY.equals(state)){
parentContext.checkChildReady();
} else if(ContextStateEnum.RUNNING.equals(state)){
parentContext.checkChildRunning();
}
parentContext.callParentChange(state);
} }
/** /**
* 查找子类是否全部完成如果子类全部完成则父类也完成 * 查找子类是否全部完成如果子类全部完成则父类也完成

View File

@ -0,0 +1,52 @@
package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.engine.manager.RunContextManager;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import jodd.util.StringUtil;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Data
public abstract class BaseWorker implements Runnable{
// 一些属性
@Resource
private RunContextManager contextManager;
private String contextKey;
@PostConstruct
public void initName(){
}
@Override
public void run() {
if (StringUtils.isEmpty(contextKey)) {
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"请设置当前worker的contextKey");
}
doWorker(contextKey);
}
public void doWorker(String contextKey){
BaseRunContext context = contextManager.getContext(contextKey);
if (context instanceof TaskRunContext taskRunContext){
try {
execute(taskRunContext);
} catch (Exception e) {
taskRunContext.changeContextState(ContextStateEnum.BAD_ENDING);
}
// TODO 执行结束修改context的state,并且通知父类
taskRunContext.changeContextState(ContextStateEnum.HAPPY_ENDING);
}
}
public abstract void execute(TaskRunContext context);
}

View File

@ -0,0 +1,23 @@
package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Slf4j
@Plugin("testTask")
public class TestWorker extends BaseWorker{
@Override
public void execute(TaskRunContext context) {
PipBaseElement contextDef = context.getContextDef();
String id = contextDef.getId();
log.info("==============触发worker执行========");
log.info("==========运行context{}===========", JSON.toJSONString(context));
}
}

View File

@ -32,22 +32,10 @@ import java.util.Set;
@Service @Service
public class TaskServiceImpl extends ServiceImpl<PipTaskDao, PipTask> implements TaskService { public class TaskServiceImpl extends ServiceImpl<PipTaskDao, PipTask> implements TaskService {
private static final String basePackage = "cd.casic.ci.process.process.service";
private Set<BeanDefinition> candidates;
@Resource @Resource
private PipTaskDao taskDao; private PipTaskDao taskDao;
@PostConstruct
public void init(){
ClassPathScanningCandidateComponentProvider provider =
new ClassPathScanningCandidateComponentProvider(false);
provider.addIncludeFilter(new AnnotationTypeFilter(Plugin.class));
candidates = provider.findCandidateComponents(basePackage);
for (BeanDefinition candidate : candidates) {
}
}
@Override @Override
public void taskTypeExist(String taskType) { public void taskTypeExist(String taskType) {

View File

@ -1,7 +0,0 @@
package cd.casic.ci.process.process.service.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin;
@Plugin
public class TestWorker {
}