Compare commits

..

11 Commits

19 changed files with 413 additions and 57 deletions

View File

@ -1,5 +1,7 @@
package cd.casic.ci.common.pipeline.annotation; package cd.casic.ci.common.pipeline.annotation;
import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Indexed; import org.springframework.stereotype.Indexed;
import java.lang.annotation.*; import java.lang.annotation.*;
@ -8,5 +10,7 @@ import java.lang.annotation.*;
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@Documented @Documented
@Indexed @Indexed
@Component
public @interface Plugin { public @interface Plugin {
String taskType();
} }

View File

@ -10,13 +10,37 @@ import java.util.concurrent.ThreadPoolExecutor;
@Configuration @Configuration
public class ExecutorConfig { public class ExecutorConfig {
@Bean("pipelineExecutor") @Bean("parallelExecutor")
public ThreadPoolTaskExecutor taskExecutor() { public ThreadPoolTaskExecutor pipelineExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5); executor.setCorePoolSize(5);
executor.setMaxPoolSize(10); executor.setMaxPoolSize(10);
executor.setQueueCapacity(100); executor.setQueueCapacity(100);
executor.setThreadNamePrefix("Pipeline-"); executor.setThreadNamePrefix("Parallel-");
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
executor.setRejectedExecutionHandler(callerRunsPolicy);
executor.initialize(); // 必须手动触发初始化
return executor;
}
@Bean("serialExecutor")
public ThreadPoolTaskExecutor serialExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("Serial-");
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
executor.setRejectedExecutionHandler(callerRunsPolicy);
executor.initialize(); // 必须手动触发初始化
return executor;
}
@Bean("workerExecutor")
public ThreadPoolTaskExecutor workerExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("Worker-");
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy(); ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
executor.setRejectedExecutionHandler(callerRunsPolicy); executor.setRejectedExecutionHandler(callerRunsPolicy);
executor.initialize(); // 必须手动触发初始化 executor.initialize(); // 必须手动触发初始化

View File

@ -7,7 +7,9 @@ import cd.casic.ci.process.engine.runContext.PipelineRunContext;
import cd.casic.ci.process.engine.runContext.SecondStageRunContext; import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.stage.PipStage; import cd.casic.ci.process.process.dataObject.stage.PipStage;
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
import org.springframework.core.task.TaskExecutor; import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.util.Collection; import java.util.Collection;
@ -24,13 +26,17 @@ public class ParallelDispatcher implements BaseDispatcher{
private Integer stageIndex; private Integer stageIndex;
private PipelineRunContext pipelineRunContext; private PipelineRunContext pipelineRunContext;
private RunContextManager runContextManager; private RunContextManager runContextManager;
private RedisMQTemplate redisMQTemplate;
private ThreadPoolTaskExecutor taskExecutor;
public ParallelDispatcher(List<PipStage> firstStageList, PipelineRunContext context,RunContextManager contextManager) { public ParallelDispatcher(List<PipStage> firstStageList, PipelineRunContext context,RunContextManager contextManager,RedisMQTemplate redisMQTemplate,ThreadPoolTaskExecutor taskExecutor) {
this.firstStageList = firstStageList; this.firstStageList = firstStageList;
this.pipelineRunContext = context; this.pipelineRunContext = context;
this.stageIndex = 0; this.stageIndex = 0;
this.runContextManager = contextManager; this.runContextManager = contextManager;
contextManager.contextRegister(context); contextManager.contextRegister(context);
this.redisMQTemplate = redisMQTemplate;
this.taskExecutor = taskExecutor;
} }
@Override @Override
@ -45,15 +51,16 @@ public class ParallelDispatcher implements BaseDispatcher{
CountDownLatch latch = new CountDownLatch(stageList.size()); CountDownLatch latch = new CountDownLatch(stageList.size());
for (PipStage secondStage : stageList) { for (PipStage secondStage : stageList) {
// 二阶段下所有task是串行所以不用关心线程安全相关信息 // 二阶段下所有task是串行所以不用关心线程安全相关信息
SecondStageRunContext context = new SecondStageRunContext(secondStage,pipelineRunContext,new HashMap<>(),new HashMap<>()); SecondStageRunContext context = new SecondStageRunContext(secondStage,pipelineRunContext,new ConcurrentHashMap<>());
runContextManager.contextRegister(context); runContextManager.contextRegister(context);
SerialDispatcher serialDispatcher = new SerialDispatcher(context,latch); SerialDispatcher serialDispatcher = new SerialDispatcher(context,latch,runContextManager,redisMQTemplate);
// 给线程池进行执行 // 给线程池进行执行
taskExecutor.execute(serialDispatcher);
} }
// 等待当前阶段执行
latch.await(); latch.await();
} }
// 入库
} }
@Override @Override
public void run() { public void run() {

View File

@ -1,22 +1,31 @@
package cd.casic.ci.process.engine.dispatcher.impl; package cd.casic.ci.process.engine.dispatcher.impl;
import cd.casic.ci.process.engine.dispatcher.BaseDispatcher; import cd.casic.ci.process.engine.dispatcher.BaseDispatcher;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.engine.manager.RunContextManager;
import cd.casic.ci.process.engine.message.TaskRunMessage;
import cd.casic.ci.process.engine.runContext.SecondStageRunContext; import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.dataObject.stage.PipStage; import cd.casic.ci.process.process.dataObject.stage.PipStage;
import cd.casic.ci.process.process.dataObject.task.PipTask; import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class SerialDispatcher implements BaseDispatcher { public class SerialDispatcher implements BaseDispatcher {
private SecondStageRunContext stageRunContext; private SecondStageRunContext stageRunContext;
private List<PipTask> taskList; private List<PipTask> taskList;
private CountDownLatch latch; private CountDownLatch latch;
private String triggerModel; private String triggerModel;
private RedisMQTemplate redisMQTemplate;
private RunContextManager contextManager;
public SerialDispatcher(SecondStageRunContext stageRunContext, CountDownLatch latch) { public SerialDispatcher(SecondStageRunContext stageRunContext, CountDownLatch latch,RunContextManager contextManager,RedisMQTemplate redisMQTemplate) {
this.stageRunContext = stageRunContext; this.stageRunContext = stageRunContext;
PipBaseElement contextDef = stageRunContext.getContextDef(); PipBaseElement contextDef = stageRunContext.getContextDef();
if (contextDef instanceof PipStage secondStage) { if (contextDef instanceof PipStage secondStage) {
@ -24,12 +33,27 @@ public class SerialDispatcher implements BaseDispatcher {
this.taskList = secondStage.getTaskValues(); this.taskList = secondStage.getTaskValues();
} }
this.latch = latch; this.latch = latch;
this.redisMQTemplate = redisMQTemplate;
this.contextManager = contextManager;
} }
@Override @Override
public void dispatch() { public void dispatch() throws InterruptedException {
for (PipTask pipTask : taskList) { for (PipTask pipTask : taskList) {
// TODO 注册taskContext且发送消息至消息队列给work执行 // 注册taskContext且发送消息至消息队列给work执行, 如果需要则传入参数
TaskRunContext taskRunContext = new TaskRunContext(pipTask,stageRunContext,new HashMap<>());
contextManager.contextRegister(taskRunContext);
taskRunContext.changeContextState(ContextStateEnum.READY);
TaskRunMessage taskRunMessage = new TaskRunMessage(pipTask);
redisMQTemplate.send(taskRunMessage);
// TODO 监听当前taskContext状态变成执行成功或者执行失败(worker当中改变状态为运行中执行成功执行失败)
//
AtomicInteger state = taskRunContext.getState();
while (state.get() != ContextStateEnum.HAPPY_ENDING.getCode()
&& state.get() != ContextStateEnum.BAD_ENDING.getCode()) {
Thread.sleep(1000L);
}
//
} }
} }
@ -40,10 +64,10 @@ public class SerialDispatcher implements BaseDispatcher {
// TODO 看看需要内存入库还是忽略掉当前执行进行入库countDown放行 // TODO 看看需要内存入库还是忽略掉当前执行进行入库countDown放行
// stageRunContext.callParentChange(); // stageRunContext.callParentChange();
dispatch(); dispatch();
latch.countDown();
} catch (Exception e) { } catch (Exception e) {
// throw new RuntimeException(e); // throw new RuntimeException(e);
// TODO 当前stage标记为失败等待父context发现并处理 // TODO 当前stage标记为失败等待父context发现并处理
} }
latch.countDown();
} }
} }

View File

@ -12,6 +12,7 @@ import cd.casic.ci.process.process.service.pipeline.PipelineService;
import cd.casic.ci.process.process.service.stage.StageService; import cd.casic.ci.process.process.service.stage.StageService;
import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@ -33,8 +34,11 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
@Resource @Resource
private RunContextManager runContextManager; private RunContextManager runContextManager;
@Resource @Resource
@Qualifier("pipelineExecutor") private ThreadPoolTaskExecutor parallelExecutor;
private ThreadPoolTaskExecutor taskExecutor; @Resource
private ThreadPoolTaskExecutor serialExecutor;
@Resource
private RedisMQTemplate redisMQTemplate;
@Override @Override
public PipelineRunContext execute(String pipelineId) { public PipelineRunContext execute(String pipelineId) {
PipPipeline pipeline = pipelineService.getById(pipelineId); PipPipeline pipeline = pipelineService.getById(pipelineId);
@ -48,9 +52,10 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
if (CollectionUtils.isEmpty(mainStage)) { if (CollectionUtils.isEmpty(mainStage)) {
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到有效阶段信息"); throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到有效阶段信息");
} }
// 如果要做 容灾就需要重新将数据库存的记录按顺序加载入
PipelineRunContext pipelineRunContext = new PipelineRunContext(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>()); PipelineRunContext pipelineRunContext = new PipelineRunContext(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>());
ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager); ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager,redisMQTemplate,serialExecutor);
taskExecutor.execute(parallelDispatcher); parallelExecutor.execute(parallelDispatcher);
return pipelineRunContext; return pipelineRunContext;
} }
} }

View File

@ -1,4 +1,6 @@
package cd.casic.ci.process.engine.manager; package cd.casic.ci.process.engine.manager;
/**
* 负责监听队列找到ContextManager获取runContext然后实际执行
* */
public interface WorkerManager { public interface WorkerManager {
} }

View File

@ -2,10 +2,20 @@ package cd.casic.ci.process.engine.manager.impl;
import cd.casic.ci.process.engine.manager.RunContextManager; import cd.casic.ci.process.engine.manager.RunContextManager;
import cd.casic.ci.process.engine.runContext.BaseRunContext; import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component @Component
public class DefaultRunContextManager implements RunContextManager { public class DefaultRunContextManager implements RunContextManager {
private final Map<String,PipelineRunContext> contextMap= new ConcurrentHashMap();
@Override @Override
public Boolean stopPipeline(String pipelineId) { public Boolean stopPipeline(String pipelineId) {
return null; return null;
@ -28,16 +38,50 @@ public class DefaultRunContextManager implements RunContextManager {
@Override @Override
public Boolean suspendStage(String pipelineId, String stageId) { public Boolean suspendStage(String pipelineId, String stageId) {
PipelineRunContext pipelineRunContext = contextMap.get(pipelineId);
if (pipelineRunContext !=null) {
}
return null; return null;
} }
@Override @Override
public void contextRegister(BaseRunContext context) { public void contextRegister(BaseRunContext context) {
PipBaseElement contextDef = context.getContextDef();
String id = contextDef.getId();
BaseRunContext parentContext = context.getParentContext();
if (context instanceof PipelineRunContext pipelineRunContext) {
contextMap.put(id,pipelineRunContext);
} else {
if (parentContext==null) {
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"注册context失败");
}
PipBaseElement parentContextDef = parentContext.getContextDef();
BaseRunContext parentRunContext = null;
for (Map.Entry<String, PipelineRunContext> entry : contextMap.entrySet()) {
PipelineRunContext value = entry.getValue();
BaseRunContext runContext = value.getRunContext(parentContextDef.getId());
if (runContext!=null) {
parentRunContext = runContext;
break;
}
}
if (parentRunContext==null) {
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到父级context");
}
parentRunContext.putChildRunContext(id,context);
}
} }
@Override @Override
public BaseRunContext getContext(String key) { public BaseRunContext getContext(String key) {
for (Map.Entry<String, PipelineRunContext> entry : contextMap.entrySet()) {
PipelineRunContext value = entry.getValue();
BaseRunContext runContext = value.getRunContext(key);
if (runContext!=null) {
return runContext;
}
}
return null; return null;
} }
} }

View File

@ -0,0 +1,86 @@
package cd.casic.ci.process.engine.manager.impl;
import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.message.TaskRunMessage;
import cd.casic.ci.process.engine.worker.BaseWorker;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import jodd.util.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@Component
@Slf4j
public class DefaultWorkerManager extends AbstractRedisStreamMessageListener<TaskRunMessage> implements ApplicationContextAware {
private static final String basePackage = "cd.casic.ci.process.engine.worker";
private Set<BeanDefinition> candidates;
private ApplicationContext applicationContext;
private Map<String,BaseWorker> taskTypeWorkerMap = null;
@Resource
private ThreadPoolTaskExecutor workerExecutor;
private void setTaskTypeWorker(Map<String, BaseWorker> taskTypeWorker) {
this.taskTypeWorkerMap = taskTypeWorker;
}
@PostConstruct
public void init(){
ClassPathScanningCandidateComponentProvider provider =
new ClassPathScanningCandidateComponentProvider(false);
provider.addIncludeFilter(new AnnotationTypeFilter(Plugin.class));
candidates = provider.findCandidateComponents(basePackage);
taskTypeWorkerMap = new HashMap<>(candidates.size());
for (BeanDefinition candidate : candidates) {
String beanClassName = candidate.getBeanClassName();
Class<?> workerClass = null;
try {
workerClass = Class.forName(beanClassName);
} catch (ClassNotFoundException e) {
continue;
}
Object bean = applicationContext.getBean(workerClass);
if (bean instanceof BaseWorker worker) {
Plugin annotation = worker.getClass().getAnnotation(Plugin.class);
String taskType = annotation.taskType();
if (StringUtils.isNotEmpty(taskType)) {
taskTypeWorkerMap.put(taskType,worker);
}
}
}
log.info("================WorkerManager初始化完毕");
}
@Override
public void onMessage(TaskRunMessage message) {
log.info("===============接收到消息================");
try {
PipTask task = message.getTask();
String taskType = task.getTaskType();
BaseWorker baseWorker = taskTypeWorkerMap.get(taskType);
baseWorker.setContextKey(task.getId());
workerExecutor.execute(baseWorker);
}catch (Exception e){
// TODO 后期可以考虑专门整一个队列
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}

View File

@ -0,0 +1,20 @@
package cd.casic.ci.process.engine.message;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
import cd.casic.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
public class TaskRunMessage extends AbstractRedisStreamMessage {
private PipTask task;
public TaskRunMessage(PipTask task) {
this.task = task;
}
}

View File

@ -54,20 +54,35 @@ public abstract class BaseRunContext {
this.localVariables = localVariables; this.localVariables = localVariables;
this.childContext = childContext; this.childContext = childContext;
} }
public void changeContextState(ContextStateEnum stateEnum){
ContextStateEnum curr = ContextStateEnum.getByCode(state.get());
if (ContextStateEnum.canGoto(curr,stateEnum)) {
state.compareAndExchange(curr.getCode(),stateEnum.getCode());
callParentChange(stateEnum);
}
}
// 保证一直都操作同一个引用的值
private void setState(AtomicInteger state) {
this.state = state;
}
/** /**
* 获取当前或者子上下文 * 获取当前或者子上下文
* */ * */
public abstract BaseRunContext getChildRunContext(String key); public abstract BaseRunContext getRunContext(String key);
public abstract void putChildRunContext(String key,BaseRunContext context); public abstract void putChildRunContext(String key,BaseRunContext context);
public void callParentChange(ContextStateEnum state){ public void callParentChange(ContextStateEnum state){
if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) { if (parentContext==null) {
checkChildEnd(); return;
} else if(ContextStateEnum.READY.equals(state)){
checkChildReady();
} else if(ContextStateEnum.RUNNING.equals(state)){
checkChildRunning();
} }
if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) {
parentContext.checkChildEnd();
} else if(ContextStateEnum.READY.equals(state)){
parentContext.checkChildReady();
} else if(ContextStateEnum.RUNNING.equals(state)){
parentContext.checkChildRunning();
}
parentContext.callParentChange(state);
} }
/** /**
* 查找子类是否全部完成如果子类全部完成则父类也完成 * 查找子类是否全部完成如果子类全部完成则父类也完成
@ -94,12 +109,13 @@ public abstract class BaseRunContext {
end = true; end = true;
} }
} }
if (!end) { if (end) {
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误"); this.changeContextState(ContextStateEnum.getByCode(result));
} }
} }
/** /**
* 查找子类是否全部就绪如果子类全部完成则父类也就绪 * 查找子类是否全部就绪如果子类全部完成则父类也就绪
* TODO 逻辑可能有点问题
* */ * */
public void checkChildReady() throws ServiceException{ public void checkChildReady() throws ServiceException{
int result = ContextStateEnum.READY.getCode(); int result = ContextStateEnum.READY.getCode();
@ -124,6 +140,7 @@ public abstract class BaseRunContext {
} }
/** /**
* 查找子类是否存在开始运行的如果有则父状态变成running * 查找子类是否存在开始运行的如果有则父状态变成running
* TODO 逻辑可能有点问题
* */ * */
public void checkChildRunning() throws ServiceException{ public void checkChildRunning() throws ServiceException{
Boolean runningFlag = false; Boolean runningFlag = false;

View File

@ -35,10 +35,13 @@ public class PipelineRunContext extends BaseRunContext{
* pipeline 底下有多个阶段多个阶段包含多个task 不保存第二级context * pipeline 底下有多个阶段多个阶段包含多个task 不保存第二级context
* */ * */
@Override @Override
public BaseRunContext getChildRunContext(String stageId) { public BaseRunContext getRunContext(String id) {
if (this.getContextDef().getId().equals(id)) {
return this;
}
Map<String, BaseRunContext> childContext = getChildContext(); Map<String, BaseRunContext> childContext = getChildContext();
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) { for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
BaseRunContext childRunContext = entry.getValue().getChildRunContext(stageId); BaseRunContext childRunContext = entry.getValue().getRunContext(id);
if (childRunContext!=null) { if (childRunContext!=null) {
return childRunContext; return childRunContext;
} }

View File

@ -4,21 +4,25 @@ import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.stage.PipStage; import cd.casic.ci.process.process.dataObject.stage.PipStage;
import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import cd.casic.framework.commons.util.collection.CollectionUtils;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
public class SecondStageRunContext extends BaseRunContext{ public class SecondStageRunContext extends BaseRunContext{
public SecondStageRunContext(PipStage contextDef, PipelineRunContext parentContext, Map<String, Object> globalVariables, Map<String, Object> localVariables) { public SecondStageRunContext(PipStage contextDef, PipelineRunContext parentContext, Map<String, Object> localVariables) {
super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), globalVariables, localVariables, new ConcurrentHashMap<>(contextDef.getStageList().size())); super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), parentContext.getGlobalVariables(), localVariables, new ConcurrentHashMap<>());
} }
@Override @Override
public BaseRunContext getChildRunContext(String taskId) { public BaseRunContext getRunContext(String id) {
if (this.getContextDef().getId().equals(id)) {
return this;
}
Map<String, BaseRunContext> childContext = getChildContext(); Map<String, BaseRunContext> childContext = getChildContext();
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) { for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
BaseRunContext childRunContext = entry.getValue().getChildRunContext(taskId); BaseRunContext childRunContext = entry.getValue().getRunContext(id);
if (childRunContext!=null) { if (childRunContext!=null) {
return childRunContext; return childRunContext;
} }

View File

@ -11,8 +11,8 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
public class TaskRunContext extends BaseRunContext{ public class TaskRunContext extends BaseRunContext{
public TaskRunContext(PipTask contextDef, SecondStageRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables) { public TaskRunContext(PipTask contextDef, SecondStageRunContext parentContext,Map<String,Object> localVariable) {
super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, new HashMap<>()); super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), parentContext.getGlobalVariables(),localVariable, new HashMap<>());
} }
private TaskRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) { private TaskRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) {
super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, childContext); super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, childContext);
@ -22,8 +22,8 @@ public class TaskRunContext extends BaseRunContext{
* task是最后一层没有下一级所以如果id相同直接返回它自己 * task是最后一层没有下一级所以如果id相同直接返回它自己
* */ * */
@Override @Override
public BaseRunContext getChildRunContext(String id) { public BaseRunContext getRunContext(String id) {
if (!StringUtils.isEmpty(id)||!id.equals(this.getContextDef().getId())) { if (StringUtils.isEmpty(id)||!id.equals(this.getContextDef().getId())) {
return null; return null;
} }
return this; return this;

View File

@ -0,0 +1,50 @@
package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.engine.manager.RunContextManager;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import jodd.util.StringUtil;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Data
public abstract class BaseWorker implements Runnable{
// 一些属性
@Resource
private RunContextManager contextManager;
private String contextKey;
@Override
public void run() {
if (StringUtils.isEmpty(contextKey)) {
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"请设置当前worker的contextKey");
}
doWorker(contextKey);
}
public void doWorker(String contextKey){
BaseRunContext context = contextManager.getContext(contextKey);
if (context instanceof TaskRunContext taskRunContext){
try {
taskRunContext.changeContextState(ContextStateEnum.RUNNING);
execute(taskRunContext);
} catch (Exception e) {
taskRunContext.changeContextState(ContextStateEnum.BAD_ENDING);
}
// TODO 执行结束修改context的state,并且通知父类
taskRunContext.changeContextState(ContextStateEnum.HAPPY_ENDING);
}
}
public abstract void execute(TaskRunContext context);
}

View File

@ -0,0 +1,23 @@
package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Slf4j
@Plugin(taskType = "testTask")
public class TestWorker extends BaseWorker{
@Override
public void execute(TaskRunContext context) {
PipBaseElement contextDef = context.getContextDef();
String id = contextDef.getId();
log.info("==============触发worker执行========");
log.info("==========运行context{}===========", JSON.toJSONString(context));
}
}

View File

@ -3,6 +3,8 @@ package cd.casic.ci.process.process.converter;
import cd.casic.ci.common.pipeline.resp.pipeline.PipelineFindResp; import cd.casic.ci.common.pipeline.resp.pipeline.PipelineFindResp;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import org.mapstruct.Mapper; import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
import org.mapstruct.factory.Mappers; import org.mapstruct.factory.Mappers;
import java.util.List; import java.util.List;
@ -17,7 +19,6 @@ import java.util.List;
@Mapper(componentModel = "spring") @Mapper(componentModel = "spring")
public interface PipelineConverter { public interface PipelineConverter {
PipelineConverter INSTANCE = Mappers.getMapper(PipelineConverter.class); PipelineConverter INSTANCE = Mappers.getMapper(PipelineConverter.class);
PipelineFindResp toResp(PipPipeline pipPipeline); PipelineFindResp toResp(PipPipeline pipPipeline);
List<PipelineFindResp> toRespList(List<PipPipeline> pipPipelines); List<PipelineFindResp> toRespList(List<PipPipeline> pipPipelines);

View File

@ -32,22 +32,10 @@ import java.util.Set;
@Service @Service
public class TaskServiceImpl extends ServiceImpl<PipTaskDao, PipTask> implements TaskService { public class TaskServiceImpl extends ServiceImpl<PipTaskDao, PipTask> implements TaskService {
private static final String basePackage = "cd.casic.ci.process.process.service";
private Set<BeanDefinition> candidates;
@Resource @Resource
private PipTaskDao taskDao; private PipTaskDao taskDao;
@PostConstruct
public void init(){
ClassPathScanningCandidateComponentProvider provider =
new ClassPathScanningCandidateComponentProvider(false);
provider.addIncludeFilter(new AnnotationTypeFilter(Plugin.class));
candidates = provider.findCandidateComponents(basePackage);
for (BeanDefinition candidate : candidates) {
}
}
@Override @Override
public void taskTypeExist(String taskType) { public void taskTypeExist(String taskType) {

View File

@ -1,7 +0,0 @@
package cd.casic.ci.process.process.service.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin;
@Plugin
public class TestWorker {
}

View File

@ -0,0 +1,61 @@
package cd.casic.server;
import cd.casic.ci.process.engine.manager.RunContextManager;
import cd.casic.ci.process.engine.message.TaskRunMessage;
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.dataObject.stage.PipStage;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
@SpringBootTest(classes = {OpsServerApplication.class})
@ActiveProfiles("local")
public class RedisMqTest {
@Resource
RedisMQTemplate redisMQTemplate;
@Resource
RunContextManager contextManager;
@Test
public void test01(){
System.out.println("h w !");
redisMQTemplate.send(new TaskRunMessage());
}
@Test
public void test02(){
System.out.println("h w !");
TaskRunMessage taskRunMessage = new TaskRunMessage();
PipTask pipTask = new PipTask();
pipTask.setTaskType("testTask");
pipTask.setId("testTaskId");
pipTask.setStageId("testStage");
PipPipeline pipeline = new PipPipeline();
pipeline.setId("testPipeline");
PipelineRunContext pipelineRunContext = new PipelineRunContext(pipeline);
PipStage stage = new PipStage();
stage.setId("testStage");
stage.setParentId("testPipeline");
SecondStageRunContext secondStageRunContext = new SecondStageRunContext(stage,pipelineRunContext,new ConcurrentHashMap<>());
TaskRunContext taskRunContext = new TaskRunContext(pipTask,secondStageRunContext,new HashMap<>());
contextManager.contextRegister(pipelineRunContext);
contextManager.contextRegister(secondStageRunContext);
contextManager.contextRegister(taskRunContext);
taskRunMessage.setTask(pipTask);
redisMQTemplate.send(taskRunMessage);
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}