From e2affe4487fc1369fd246660e7091f9f2773ecbf Mon Sep 17 00:00:00 2001 From: even <827656971@qq.com> Date: Sat, 17 May 2025 17:28:40 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=A7=E8=A1=8C=E5=BC=95=E6=93=8E=E9=83=A8?= =?UTF-8?q?=E5=88=86=E9=80=BB=E8=BE=91=EF=BC=8C=E4=BF=AE=E6=94=B9redis?= =?UTF-8?q?=E5=BF=85=E9=A1=BB=E4=BD=BF=E7=94=A85=E4=BB=A5=E4=B8=8A?= =?UTF-8?q?=E7=89=88=E6=9C=AC=EF=BC=88=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ci/common/pipeline/annotation/Plugin.java | 7 ++ .../process/engine/config/ExecutorConfig.java | 22 ++++-- .../dispatcher/impl/SerialDispatcher.java | 2 +- .../impl/DefaultPipelineExecutor.java | 10 ++- .../manager/impl/DefaultWorkerManager.java | 68 ++++++++++++++++++- .../engine/runContext/BaseRunContext.java | 27 ++++++-- .../ci/process/engine/worker/BaseWorker.java | 52 ++++++++++++++ .../ci/process/engine/worker/TestWorker.java | 23 +++++++ .../service/task/impl/TaskServiceImpl.java | 12 ---- .../process/service/worker/TestWorker.java | 7 -- 10 files changed, 192 insertions(+), 38 deletions(-) create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/BaseWorker.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TestWorker.java delete mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/worker/TestWorker.java diff --git a/modules/module-ci-common-pipeline/src/main/java/cd/casic/ci/common/pipeline/annotation/Plugin.java b/modules/module-ci-common-pipeline/src/main/java/cd/casic/ci/common/pipeline/annotation/Plugin.java index 0d261c57..e5057a09 100644 --- a/modules/module-ci-common-pipeline/src/main/java/cd/casic/ci/common/pipeline/annotation/Plugin.java +++ b/modules/module-ci-common-pipeline/src/main/java/cd/casic/ci/common/pipeline/annotation/Plugin.java @@ -1,5 +1,7 @@ package cd.casic.ci.common.pipeline.annotation; +import org.springframework.core.annotation.AliasFor; +import org.springframework.stereotype.Component; import org.springframework.stereotype.Indexed; import java.lang.annotation.*; @@ -8,5 +10,10 @@ import java.lang.annotation.*; @Retention(RetentionPolicy.RUNTIME) @Documented @Indexed +@Component public @interface Plugin { + @AliasFor("value") + String taskType() default ""; + @AliasFor("taskType") + String value() default ""; } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/config/ExecutorConfig.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/config/ExecutorConfig.java index 22b376a2..a65297a9 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/config/ExecutorConfig.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/config/ExecutorConfig.java @@ -10,25 +10,37 @@ import java.util.concurrent.ThreadPoolExecutor; @Configuration public class ExecutorConfig { - @Bean("pipelineExecutor") + @Bean("parallelExecutor") public ThreadPoolTaskExecutor pipelineExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); executor.setQueueCapacity(100); - executor.setThreadNamePrefix("Pipeline-"); + executor.setThreadNamePrefix("Parallel-"); ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy(); executor.setRejectedExecutionHandler(callerRunsPolicy); executor.initialize(); // 必须手动触发初始化 return executor; } - @Bean("taskExecutor") - public ThreadPoolTaskExecutor taskExecutor() { + @Bean("serialExecutor") + public ThreadPoolTaskExecutor serialExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); executor.setQueueCapacity(100); - executor.setThreadNamePrefix("Task-"); + executor.setThreadNamePrefix("Serial-"); + ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy(); + executor.setRejectedExecutionHandler(callerRunsPolicy); + executor.initialize(); // 必须手动触发初始化 + return executor; + } + @Bean("workerExecutor") + public ThreadPoolTaskExecutor workerExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(5); + executor.setMaxPoolSize(10); + executor.setQueueCapacity(100); + executor.setThreadNamePrefix("Worker-"); ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy(); executor.setRejectedExecutionHandler(callerRunsPolicy); executor.initialize(); // 必须手动触发初始化 diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java index e1999a84..515b146b 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java @@ -42,7 +42,7 @@ public class SerialDispatcher implements BaseDispatcher { // 注册taskContext,且发送消息至消息队列给work执行 TaskRunContext taskRunContext = new TaskRunContext(pipTask,stageRunContext); contextManager.contextRegister(taskRunContext); - taskRunContext.setState(new AtomicInteger(ContextStateEnum.READY.getCode())); + taskRunContext.changeContextState(ContextStateEnum.READY); TaskRunMessage taskRunMessage = new TaskRunMessage(pipTask); redisMQTemplate.send(taskRunMessage); } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java index 2d41fdd9..703a43c1 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java @@ -34,11 +34,9 @@ public class DefaultPipelineExecutor implements PipelineExecutor { @Resource private RunContextManager runContextManager; @Resource - @Qualifier("pipelineExecutor") - private ThreadPoolTaskExecutor pipelineExecutor; + private ThreadPoolTaskExecutor parallelExecutor; @Resource - @Qualifier("taskExecutor") - private ThreadPoolTaskExecutor taskExecutor; + private ThreadPoolTaskExecutor serialExecutor; @Resource private RedisMQTemplate redisMQTemplate; @Override @@ -55,8 +53,8 @@ public class DefaultPipelineExecutor implements PipelineExecutor { throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到有效阶段信息"); } PipelineRunContext pipelineRunContext = new PipelineRunContext(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>()); - ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager,redisMQTemplate,taskExecutor); - taskExecutor.execute(parallelDispatcher); + ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager,redisMQTemplate,serialExecutor); + parallelExecutor.execute(parallelDispatcher); return pipelineRunContext; } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultWorkerManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultWorkerManager.java index 08da74bb..dbed8fe6 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultWorkerManager.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultWorkerManager.java @@ -1,15 +1,81 @@ package cd.casic.ci.process.engine.manager.impl; +import cd.casic.ci.common.pipeline.annotation.Plugin; import cd.casic.ci.process.engine.message.TaskRunMessage; +import cd.casic.ci.process.engine.worker.BaseWorker; +import cd.casic.ci.process.process.dataObject.task.PipTask; import cd.casic.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.Resource; +import jodd.util.StringUtil; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider; +import org.springframework.core.type.filter.AnnotationTypeFilter; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + @Component @Slf4j -public class DefaultWorkerManager extends AbstractRedisStreamMessageListener { +public class DefaultWorkerManager extends AbstractRedisStreamMessageListener implements ApplicationContextAware { + private static final String basePackage = "cd.casic.ci.process.engine.worker"; + private Set candidates; + private ApplicationContext applicationContext; + private Map taskTypeWorkerMap = null; + @Resource + private ThreadPoolTaskExecutor workerExecutor; + + private void setTaskTypeWorker(Map taskTypeWorker) { + this.taskTypeWorkerMap = taskTypeWorker; + } + + @PostConstruct + public void init(){ + ClassPathScanningCandidateComponentProvider provider = + new ClassPathScanningCandidateComponentProvider(false); + provider.addIncludeFilter(new AnnotationTypeFilter(Plugin.class)); + candidates = provider.findCandidateComponents(basePackage); + taskTypeWorkerMap = new HashMap<>(candidates.size()); + for (BeanDefinition candidate : candidates) { + String beanClassName = candidate.getBeanClassName(); + Class workerClass = null; + try { + workerClass = Class.forName(beanClassName); + } catch (ClassNotFoundException e) { + continue; + } + Object bean = applicationContext.getBean(workerClass); + if (bean instanceof BaseWorker worker) { + Plugin annotation = worker.getClass().getAnnotation(Plugin.class); + String taskType = annotation.taskType(); + if (StringUtils.isEmpty(taskType)) { + taskTypeWorkerMap.put(taskType,worker); + } + } + } + log.info("================WorkerManager初始化完毕"); + } @Override public void onMessage(TaskRunMessage message) { log.info("===============接收到消息================"); + PipTask task = message.getTask(); + String taskType = task.getTaskType(); + BaseWorker baseWorker = taskTypeWorkerMap.get(taskType); + baseWorker.setContextKey(task.getId()); + workerExecutor.execute(baseWorker); + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java index 78098055..6c45e759 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java @@ -54,6 +54,17 @@ public abstract class BaseRunContext { this.localVariables = localVariables; this.childContext = childContext; } + public void changeContextState(ContextStateEnum stateEnum){ + ContextStateEnum curr = ContextStateEnum.getByCode(state.get()); + if (ContextStateEnum.canGoto(curr,stateEnum)) { + state.compareAndExchange(curr.getCode(),stateEnum.getCode()); + callParentChange(stateEnum); + } + } + + private void setState(AtomicInteger state) { + this.state = state; + } /** * 获取当前或者子上下文 @@ -61,13 +72,17 @@ public abstract class BaseRunContext { public abstract BaseRunContext getRunContext(String key); public abstract void putChildRunContext(String key,BaseRunContext context); public void callParentChange(ContextStateEnum state){ - if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) { - checkChildEnd(); - } else if(ContextStateEnum.READY.equals(state)){ - checkChildReady(); - } else if(ContextStateEnum.RUNNING.equals(state)){ - checkChildRunning(); + if (parentContext==null) { + return; } + if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) { + parentContext.checkChildEnd(); + } else if(ContextStateEnum.READY.equals(state)){ + parentContext.checkChildReady(); + } else if(ContextStateEnum.RUNNING.equals(state)){ + parentContext.checkChildRunning(); + } + parentContext.callParentChange(state); } /** * 查找子类是否全部完成,如果子类全部完成则父类也完成 diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/BaseWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/BaseWorker.java new file mode 100644 index 00000000..634390f7 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/BaseWorker.java @@ -0,0 +1,52 @@ +package cd.casic.ci.process.engine.worker; + +import cd.casic.ci.common.pipeline.annotation.Plugin; +import cd.casic.ci.process.engine.enums.ContextStateEnum; +import cd.casic.ci.process.engine.manager.RunContextManager; +import cd.casic.ci.process.engine.runContext.BaseRunContext; +import cd.casic.ci.process.engine.runContext.PipelineRunContext; +import cd.casic.ci.process.engine.runContext.TaskRunContext; +import cd.casic.framework.commons.exception.ServiceException; +import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.Resource; +import jodd.util.StringUtil; +import lombok.Data; +import org.apache.commons.lang3.StringUtils; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + + +@Data +public abstract class BaseWorker implements Runnable{ + // 一些属性 + @Resource + private RunContextManager contextManager; + private String contextKey; + + @PostConstruct + public void initName(){ + + } + @Override + public void run() { + if (StringUtils.isEmpty(contextKey)) { + throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"请设置当前worker的contextKey"); + } + doWorker(contextKey); + } + + public void doWorker(String contextKey){ + BaseRunContext context = contextManager.getContext(contextKey); + if (context instanceof TaskRunContext taskRunContext){ + try { + execute(taskRunContext); + } catch (Exception e) { + taskRunContext.changeContextState(ContextStateEnum.BAD_ENDING); + } + // TODO 执行结束修改context的state,并且通知父类 + taskRunContext.changeContextState(ContextStateEnum.HAPPY_ENDING); + } + } + public abstract void execute(TaskRunContext context); + +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TestWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TestWorker.java new file mode 100644 index 00000000..d633d6b7 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TestWorker.java @@ -0,0 +1,23 @@ +package cd.casic.ci.process.engine.worker; + +import cd.casic.ci.common.pipeline.annotation.Plugin; +import cd.casic.ci.process.engine.runContext.TaskRunContext; +import cd.casic.ci.process.process.dataObject.base.PipBaseElement; +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + + +@Slf4j +@Plugin("testTask") +public class TestWorker extends BaseWorker{ + + + @Override + public void execute(TaskRunContext context) { + PipBaseElement contextDef = context.getContextDef(); + String id = contextDef.getId(); + log.info("==============触发worker执行========"); + log.info("==========运行context:{}===========", JSON.toJSONString(context)); + } +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/task/impl/TaskServiceImpl.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/task/impl/TaskServiceImpl.java index 4c590c9f..d05c6e89 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/task/impl/TaskServiceImpl.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/task/impl/TaskServiceImpl.java @@ -32,22 +32,10 @@ import java.util.Set; @Service public class TaskServiceImpl extends ServiceImpl implements TaskService { - private static final String basePackage = "cd.casic.ci.process.process.service"; - private Set candidates; @Resource private PipTaskDao taskDao; - @PostConstruct - public void init(){ - ClassPathScanningCandidateComponentProvider provider = - new ClassPathScanningCandidateComponentProvider(false); - provider.addIncludeFilter(new AnnotationTypeFilter(Plugin.class)); - candidates = provider.findCandidateComponents(basePackage); - for (BeanDefinition candidate : candidates) { - } - - } @Override public void taskTypeExist(String taskType) { diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/worker/TestWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/worker/TestWorker.java deleted file mode 100644 index c2a93bea..00000000 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/worker/TestWorker.java +++ /dev/null @@ -1,7 +0,0 @@ -package cd.casic.ci.process.process.service.worker; - -import cd.casic.ci.common.pipeline.annotation.Plugin; - -@Plugin -public class TestWorker { -}