Compare commits
No commits in common. "6b79782a2bef9fdfb8a1d57c5b5dc0e8db62687c" and "575b415c162ddb0f34979b14dd3420c998e4c8a2" have entirely different histories.
6b79782a2b
...
575b415c16
@ -1,7 +1,5 @@
|
|||||||
package cd.casic.ci.common.pipeline.annotation;
|
package cd.casic.ci.common.pipeline.annotation;
|
||||||
|
|
||||||
import org.springframework.core.annotation.AliasFor;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
import org.springframework.stereotype.Indexed;
|
import org.springframework.stereotype.Indexed;
|
||||||
|
|
||||||
import java.lang.annotation.*;
|
import java.lang.annotation.*;
|
||||||
@ -10,7 +8,5 @@ import java.lang.annotation.*;
|
|||||||
@Retention(RetentionPolicy.RUNTIME)
|
@Retention(RetentionPolicy.RUNTIME)
|
||||||
@Documented
|
@Documented
|
||||||
@Indexed
|
@Indexed
|
||||||
@Component
|
|
||||||
public @interface Plugin {
|
public @interface Plugin {
|
||||||
String taskType();
|
|
||||||
}
|
}
|
||||||
|
@ -10,37 +10,13 @@ import java.util.concurrent.ThreadPoolExecutor;
|
|||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
public class ExecutorConfig {
|
public class ExecutorConfig {
|
||||||
@Bean("parallelExecutor")
|
@Bean("pipelineExecutor")
|
||||||
public ThreadPoolTaskExecutor pipelineExecutor() {
|
public ThreadPoolTaskExecutor taskExecutor() {
|
||||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||||
executor.setCorePoolSize(5);
|
executor.setCorePoolSize(5);
|
||||||
executor.setMaxPoolSize(10);
|
executor.setMaxPoolSize(10);
|
||||||
executor.setQueueCapacity(100);
|
executor.setQueueCapacity(100);
|
||||||
executor.setThreadNamePrefix("Parallel-");
|
executor.setThreadNamePrefix("Pipeline-");
|
||||||
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
|
|
||||||
executor.setRejectedExecutionHandler(callerRunsPolicy);
|
|
||||||
executor.initialize(); // 必须手动触发初始化
|
|
||||||
return executor;
|
|
||||||
}
|
|
||||||
@Bean("serialExecutor")
|
|
||||||
public ThreadPoolTaskExecutor serialExecutor() {
|
|
||||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
|
||||||
executor.setCorePoolSize(5);
|
|
||||||
executor.setMaxPoolSize(10);
|
|
||||||
executor.setQueueCapacity(100);
|
|
||||||
executor.setThreadNamePrefix("Serial-");
|
|
||||||
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
|
|
||||||
executor.setRejectedExecutionHandler(callerRunsPolicy);
|
|
||||||
executor.initialize(); // 必须手动触发初始化
|
|
||||||
return executor;
|
|
||||||
}
|
|
||||||
@Bean("workerExecutor")
|
|
||||||
public ThreadPoolTaskExecutor workerExecutor() {
|
|
||||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
|
||||||
executor.setCorePoolSize(5);
|
|
||||||
executor.setMaxPoolSize(10);
|
|
||||||
executor.setQueueCapacity(100);
|
|
||||||
executor.setThreadNamePrefix("Worker-");
|
|
||||||
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
|
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
|
||||||
executor.setRejectedExecutionHandler(callerRunsPolicy);
|
executor.setRejectedExecutionHandler(callerRunsPolicy);
|
||||||
executor.initialize(); // 必须手动触发初始化
|
executor.initialize(); // 必须手动触发初始化
|
||||||
|
@ -7,9 +7,7 @@ import cd.casic.ci.process.engine.runContext.PipelineRunContext;
|
|||||||
import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
|
import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
|
||||||
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
||||||
import cd.casic.ci.process.process.dataObject.stage.PipStage;
|
import cd.casic.ci.process.process.dataObject.stage.PipStage;
|
||||||
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
|
|
||||||
import org.springframework.core.task.TaskExecutor;
|
import org.springframework.core.task.TaskExecutor;
|
||||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
@ -26,17 +24,13 @@ public class ParallelDispatcher implements BaseDispatcher{
|
|||||||
private Integer stageIndex;
|
private Integer stageIndex;
|
||||||
private PipelineRunContext pipelineRunContext;
|
private PipelineRunContext pipelineRunContext;
|
||||||
private RunContextManager runContextManager;
|
private RunContextManager runContextManager;
|
||||||
private RedisMQTemplate redisMQTemplate;
|
|
||||||
private ThreadPoolTaskExecutor taskExecutor;
|
|
||||||
|
|
||||||
public ParallelDispatcher(List<PipStage> firstStageList, PipelineRunContext context,RunContextManager contextManager,RedisMQTemplate redisMQTemplate,ThreadPoolTaskExecutor taskExecutor) {
|
public ParallelDispatcher(List<PipStage> firstStageList, PipelineRunContext context,RunContextManager contextManager) {
|
||||||
this.firstStageList = firstStageList;
|
this.firstStageList = firstStageList;
|
||||||
this.pipelineRunContext = context;
|
this.pipelineRunContext = context;
|
||||||
this.stageIndex = 0;
|
this.stageIndex = 0;
|
||||||
this.runContextManager = contextManager;
|
this.runContextManager = contextManager;
|
||||||
contextManager.contextRegister(context);
|
contextManager.contextRegister(context);
|
||||||
this.redisMQTemplate = redisMQTemplate;
|
|
||||||
this.taskExecutor = taskExecutor;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -51,16 +45,15 @@ public class ParallelDispatcher implements BaseDispatcher{
|
|||||||
CountDownLatch latch = new CountDownLatch(stageList.size());
|
CountDownLatch latch = new CountDownLatch(stageList.size());
|
||||||
for (PipStage secondStage : stageList) {
|
for (PipStage secondStage : stageList) {
|
||||||
// 二阶段下所有task是串行所以不用关心线程安全相关信息
|
// 二阶段下所有task是串行所以不用关心线程安全相关信息
|
||||||
SecondStageRunContext context = new SecondStageRunContext(secondStage,pipelineRunContext,new ConcurrentHashMap<>());
|
SecondStageRunContext context = new SecondStageRunContext(secondStage,pipelineRunContext,new HashMap<>(),new HashMap<>());
|
||||||
|
|
||||||
runContextManager.contextRegister(context);
|
runContextManager.contextRegister(context);
|
||||||
SerialDispatcher serialDispatcher = new SerialDispatcher(context,latch,runContextManager,redisMQTemplate);
|
SerialDispatcher serialDispatcher = new SerialDispatcher(context,latch);
|
||||||
// 给线程池进行执行
|
// 给线程池进行执行
|
||||||
taskExecutor.execute(serialDispatcher);
|
|
||||||
}
|
}
|
||||||
// 等待当前阶段执行
|
|
||||||
latch.await();
|
latch.await();
|
||||||
}
|
}
|
||||||
// 入库
|
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -1,31 +1,22 @@
|
|||||||
package cd.casic.ci.process.engine.dispatcher.impl;
|
package cd.casic.ci.process.engine.dispatcher.impl;
|
||||||
|
|
||||||
import cd.casic.ci.process.engine.dispatcher.BaseDispatcher;
|
import cd.casic.ci.process.engine.dispatcher.BaseDispatcher;
|
||||||
import cd.casic.ci.process.engine.enums.ContextStateEnum;
|
|
||||||
import cd.casic.ci.process.engine.manager.RunContextManager;
|
|
||||||
import cd.casic.ci.process.engine.message.TaskRunMessage;
|
|
||||||
import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
|
import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
|
||||||
import cd.casic.ci.process.engine.runContext.TaskRunContext;
|
|
||||||
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
||||||
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
|
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
|
||||||
import cd.casic.ci.process.process.dataObject.stage.PipStage;
|
import cd.casic.ci.process.process.dataObject.stage.PipStage;
|
||||||
import cd.casic.ci.process.process.dataObject.task.PipTask;
|
import cd.casic.ci.process.process.dataObject.task.PipTask;
|
||||||
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
public class SerialDispatcher implements BaseDispatcher {
|
public class SerialDispatcher implements BaseDispatcher {
|
||||||
private SecondStageRunContext stageRunContext;
|
private SecondStageRunContext stageRunContext;
|
||||||
private List<PipTask> taskList;
|
private List<PipTask> taskList;
|
||||||
private CountDownLatch latch;
|
private CountDownLatch latch;
|
||||||
private String triggerModel;
|
private String triggerModel;
|
||||||
private RedisMQTemplate redisMQTemplate;
|
|
||||||
private RunContextManager contextManager;
|
|
||||||
|
|
||||||
public SerialDispatcher(SecondStageRunContext stageRunContext, CountDownLatch latch,RunContextManager contextManager,RedisMQTemplate redisMQTemplate) {
|
public SerialDispatcher(SecondStageRunContext stageRunContext, CountDownLatch latch) {
|
||||||
this.stageRunContext = stageRunContext;
|
this.stageRunContext = stageRunContext;
|
||||||
PipBaseElement contextDef = stageRunContext.getContextDef();
|
PipBaseElement contextDef = stageRunContext.getContextDef();
|
||||||
if (contextDef instanceof PipStage secondStage) {
|
if (contextDef instanceof PipStage secondStage) {
|
||||||
@ -33,27 +24,12 @@ public class SerialDispatcher implements BaseDispatcher {
|
|||||||
this.taskList = secondStage.getTaskValues();
|
this.taskList = secondStage.getTaskValues();
|
||||||
}
|
}
|
||||||
this.latch = latch;
|
this.latch = latch;
|
||||||
this.redisMQTemplate = redisMQTemplate;
|
|
||||||
this.contextManager = contextManager;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dispatch() throws InterruptedException {
|
public void dispatch() {
|
||||||
for (PipTask pipTask : taskList) {
|
for (PipTask pipTask : taskList) {
|
||||||
// 注册taskContext,且发送消息至消息队列给work执行, 如果需要则传入参数
|
// TODO 注册taskContext,且发送消息至消息队列给work执行
|
||||||
TaskRunContext taskRunContext = new TaskRunContext(pipTask,stageRunContext,new HashMap<>());
|
|
||||||
contextManager.contextRegister(taskRunContext);
|
|
||||||
taskRunContext.changeContextState(ContextStateEnum.READY);
|
|
||||||
TaskRunMessage taskRunMessage = new TaskRunMessage(pipTask);
|
|
||||||
redisMQTemplate.send(taskRunMessage);
|
|
||||||
// TODO 监听当前taskContext状态变成执行成功或者执行失败(worker当中改变状态为运行中、执行成功、执行失败)
|
|
||||||
//
|
|
||||||
AtomicInteger state = taskRunContext.getState();
|
|
||||||
while (state.get() != ContextStateEnum.HAPPY_ENDING.getCode()
|
|
||||||
&& state.get() != ContextStateEnum.BAD_ENDING.getCode()) {
|
|
||||||
Thread.sleep(1000L);
|
|
||||||
}
|
|
||||||
//
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -64,10 +40,10 @@ public class SerialDispatcher implements BaseDispatcher {
|
|||||||
// TODO 看看需要内存入库还是忽略掉当前执行,进行入库(countDown放行)
|
// TODO 看看需要内存入库还是忽略掉当前执行,进行入库(countDown放行)
|
||||||
// stageRunContext.callParentChange();
|
// stageRunContext.callParentChange();
|
||||||
dispatch();
|
dispatch();
|
||||||
|
latch.countDown();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// throw new RuntimeException(e);
|
// throw new RuntimeException(e);
|
||||||
// TODO 当前stage标记为失败等待父context发现并处理
|
// TODO 当前stage标记为失败等待父context发现并处理
|
||||||
}
|
}
|
||||||
latch.countDown();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -12,7 +12,6 @@ import cd.casic.ci.process.process.service.pipeline.PipelineService;
|
|||||||
import cd.casic.ci.process.process.service.stage.StageService;
|
import cd.casic.ci.process.process.service.stage.StageService;
|
||||||
import cd.casic.framework.commons.exception.ServiceException;
|
import cd.casic.framework.commons.exception.ServiceException;
|
||||||
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
|
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
|
||||||
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
|
|
||||||
import jakarta.annotation.Resource;
|
import jakarta.annotation.Resource;
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
@ -34,11 +33,8 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
|
|||||||
@Resource
|
@Resource
|
||||||
private RunContextManager runContextManager;
|
private RunContextManager runContextManager;
|
||||||
@Resource
|
@Resource
|
||||||
private ThreadPoolTaskExecutor parallelExecutor;
|
@Qualifier("pipelineExecutor")
|
||||||
@Resource
|
private ThreadPoolTaskExecutor taskExecutor;
|
||||||
private ThreadPoolTaskExecutor serialExecutor;
|
|
||||||
@Resource
|
|
||||||
private RedisMQTemplate redisMQTemplate;
|
|
||||||
@Override
|
@Override
|
||||||
public PipelineRunContext execute(String pipelineId) {
|
public PipelineRunContext execute(String pipelineId) {
|
||||||
PipPipeline pipeline = pipelineService.getById(pipelineId);
|
PipPipeline pipeline = pipelineService.getById(pipelineId);
|
||||||
@ -52,10 +48,9 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
|
|||||||
if (CollectionUtils.isEmpty(mainStage)) {
|
if (CollectionUtils.isEmpty(mainStage)) {
|
||||||
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到有效阶段信息");
|
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到有效阶段信息");
|
||||||
}
|
}
|
||||||
// 如果要做 容灾就需要重新将数据库存的记录按顺序加载入
|
|
||||||
PipelineRunContext pipelineRunContext = new PipelineRunContext(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>());
|
PipelineRunContext pipelineRunContext = new PipelineRunContext(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>());
|
||||||
ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager,redisMQTemplate,serialExecutor);
|
ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager);
|
||||||
parallelExecutor.execute(parallelDispatcher);
|
taskExecutor.execute(parallelDispatcher);
|
||||||
return pipelineRunContext;
|
return pipelineRunContext;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,4 @@
|
|||||||
package cd.casic.ci.process.engine.manager;
|
package cd.casic.ci.process.engine.manager;
|
||||||
/**
|
|
||||||
* 负责监听队列,找到ContextManager获取runContext,然后实际执行
|
|
||||||
* */
|
|
||||||
public interface WorkerManager {
|
public interface WorkerManager {
|
||||||
}
|
}
|
||||||
|
@ -2,20 +2,10 @@ package cd.casic.ci.process.engine.manager.impl;
|
|||||||
|
|
||||||
import cd.casic.ci.process.engine.manager.RunContextManager;
|
import cd.casic.ci.process.engine.manager.RunContextManager;
|
||||||
import cd.casic.ci.process.engine.runContext.BaseRunContext;
|
import cd.casic.ci.process.engine.runContext.BaseRunContext;
|
||||||
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
|
|
||||||
import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
|
|
||||||
import cd.casic.ci.process.engine.runContext.TaskRunContext;
|
|
||||||
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
|
||||||
import cd.casic.framework.commons.exception.ServiceException;
|
|
||||||
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
|
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
public class DefaultRunContextManager implements RunContextManager {
|
public class DefaultRunContextManager implements RunContextManager {
|
||||||
private final Map<String,PipelineRunContext> contextMap= new ConcurrentHashMap();
|
|
||||||
@Override
|
@Override
|
||||||
public Boolean stopPipeline(String pipelineId) {
|
public Boolean stopPipeline(String pipelineId) {
|
||||||
return null;
|
return null;
|
||||||
@ -38,50 +28,16 @@ public class DefaultRunContextManager implements RunContextManager {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Boolean suspendStage(String pipelineId, String stageId) {
|
public Boolean suspendStage(String pipelineId, String stageId) {
|
||||||
PipelineRunContext pipelineRunContext = contextMap.get(pipelineId);
|
|
||||||
if (pipelineRunContext !=null) {
|
|
||||||
|
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void contextRegister(BaseRunContext context) {
|
public void contextRegister(BaseRunContext context) {
|
||||||
PipBaseElement contextDef = context.getContextDef();
|
|
||||||
String id = contextDef.getId();
|
|
||||||
BaseRunContext parentContext = context.getParentContext();
|
|
||||||
if (context instanceof PipelineRunContext pipelineRunContext) {
|
|
||||||
contextMap.put(id,pipelineRunContext);
|
|
||||||
} else {
|
|
||||||
if (parentContext==null) {
|
|
||||||
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"注册context失败");
|
|
||||||
}
|
|
||||||
PipBaseElement parentContextDef = parentContext.getContextDef();
|
|
||||||
BaseRunContext parentRunContext = null;
|
|
||||||
for (Map.Entry<String, PipelineRunContext> entry : contextMap.entrySet()) {
|
|
||||||
PipelineRunContext value = entry.getValue();
|
|
||||||
BaseRunContext runContext = value.getRunContext(parentContextDef.getId());
|
|
||||||
if (runContext!=null) {
|
|
||||||
parentRunContext = runContext;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (parentRunContext==null) {
|
|
||||||
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到父级context");
|
|
||||||
}
|
|
||||||
parentRunContext.putChildRunContext(id,context);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BaseRunContext getContext(String key) {
|
public BaseRunContext getContext(String key) {
|
||||||
for (Map.Entry<String, PipelineRunContext> entry : contextMap.entrySet()) {
|
|
||||||
PipelineRunContext value = entry.getValue();
|
|
||||||
BaseRunContext runContext = value.getRunContext(key);
|
|
||||||
if (runContext!=null) {
|
|
||||||
return runContext;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,86 +0,0 @@
|
|||||||
package cd.casic.ci.process.engine.manager.impl;
|
|
||||||
|
|
||||||
import cd.casic.ci.common.pipeline.annotation.Plugin;
|
|
||||||
import cd.casic.ci.process.engine.message.TaskRunMessage;
|
|
||||||
import cd.casic.ci.process.engine.worker.BaseWorker;
|
|
||||||
import cd.casic.ci.process.process.dataObject.task.PipTask;
|
|
||||||
import cd.casic.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
|
|
||||||
import jakarta.annotation.PostConstruct;
|
|
||||||
import jakarta.annotation.Resource;
|
|
||||||
import jodd.util.StringUtil;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.springframework.beans.BeansException;
|
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
|
||||||
import org.springframework.beans.factory.config.BeanDefinition;
|
|
||||||
import org.springframework.context.ApplicationContext;
|
|
||||||
import org.springframework.context.ApplicationContextAware;
|
|
||||||
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
|
|
||||||
import org.springframework.core.annotation.AnnotatedElementUtils;
|
|
||||||
import org.springframework.core.type.filter.AnnotationTypeFilter;
|
|
||||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
@Component
|
|
||||||
@Slf4j
|
|
||||||
public class DefaultWorkerManager extends AbstractRedisStreamMessageListener<TaskRunMessage> implements ApplicationContextAware {
|
|
||||||
private static final String basePackage = "cd.casic.ci.process.engine.worker";
|
|
||||||
private Set<BeanDefinition> candidates;
|
|
||||||
private ApplicationContext applicationContext;
|
|
||||||
private Map<String,BaseWorker> taskTypeWorkerMap = null;
|
|
||||||
@Resource
|
|
||||||
private ThreadPoolTaskExecutor workerExecutor;
|
|
||||||
|
|
||||||
private void setTaskTypeWorker(Map<String, BaseWorker> taskTypeWorker) {
|
|
||||||
this.taskTypeWorkerMap = taskTypeWorker;
|
|
||||||
}
|
|
||||||
|
|
||||||
@PostConstruct
|
|
||||||
public void init(){
|
|
||||||
ClassPathScanningCandidateComponentProvider provider =
|
|
||||||
new ClassPathScanningCandidateComponentProvider(false);
|
|
||||||
provider.addIncludeFilter(new AnnotationTypeFilter(Plugin.class));
|
|
||||||
candidates = provider.findCandidateComponents(basePackage);
|
|
||||||
taskTypeWorkerMap = new HashMap<>(candidates.size());
|
|
||||||
for (BeanDefinition candidate : candidates) {
|
|
||||||
String beanClassName = candidate.getBeanClassName();
|
|
||||||
Class<?> workerClass = null;
|
|
||||||
try {
|
|
||||||
workerClass = Class.forName(beanClassName);
|
|
||||||
} catch (ClassNotFoundException e) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Object bean = applicationContext.getBean(workerClass);
|
|
||||||
if (bean instanceof BaseWorker worker) {
|
|
||||||
Plugin annotation = worker.getClass().getAnnotation(Plugin.class);
|
|
||||||
String taskType = annotation.taskType();
|
|
||||||
if (StringUtils.isNotEmpty(taskType)) {
|
|
||||||
taskTypeWorkerMap.put(taskType,worker);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
log.info("================WorkerManager初始化完毕");
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
public void onMessage(TaskRunMessage message) {
|
|
||||||
log.info("===============接收到消息================");
|
|
||||||
try {
|
|
||||||
PipTask task = message.getTask();
|
|
||||||
String taskType = task.getTaskType();
|
|
||||||
BaseWorker baseWorker = taskTypeWorkerMap.get(taskType);
|
|
||||||
baseWorker.setContextKey(task.getId());
|
|
||||||
workerExecutor.execute(baseWorker);
|
|
||||||
}catch (Exception e){
|
|
||||||
// TODO 后期可以考虑专门整一个队列
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
|
||||||
this.applicationContext = applicationContext;
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,20 +0,0 @@
|
|||||||
package cd.casic.ci.process.engine.message;
|
|
||||||
|
|
||||||
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
|
||||||
import cd.casic.ci.process.process.dataObject.task.PipTask;
|
|
||||||
import cd.casic.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
|
|
||||||
import cd.casic.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
|
|
||||||
import lombok.Data;
|
|
||||||
import lombok.EqualsAndHashCode;
|
|
||||||
import lombok.NoArgsConstructor;
|
|
||||||
|
|
||||||
@EqualsAndHashCode(callSuper = true)
|
|
||||||
@Data
|
|
||||||
@NoArgsConstructor
|
|
||||||
public class TaskRunMessage extends AbstractRedisStreamMessage {
|
|
||||||
private PipTask task;
|
|
||||||
|
|
||||||
public TaskRunMessage(PipTask task) {
|
|
||||||
this.task = task;
|
|
||||||
}
|
|
||||||
}
|
|
@ -54,35 +54,20 @@ public abstract class BaseRunContext {
|
|||||||
this.localVariables = localVariables;
|
this.localVariables = localVariables;
|
||||||
this.childContext = childContext;
|
this.childContext = childContext;
|
||||||
}
|
}
|
||||||
public void changeContextState(ContextStateEnum stateEnum){
|
|
||||||
ContextStateEnum curr = ContextStateEnum.getByCode(state.get());
|
|
||||||
if (ContextStateEnum.canGoto(curr,stateEnum)) {
|
|
||||||
state.compareAndExchange(curr.getCode(),stateEnum.getCode());
|
|
||||||
callParentChange(stateEnum);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// 保证一直都操作同一个引用的值
|
|
||||||
private void setState(AtomicInteger state) {
|
|
||||||
this.state = state;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取当前或者子上下文
|
* 获取当前或者子上下文
|
||||||
* */
|
* */
|
||||||
public abstract BaseRunContext getRunContext(String key);
|
public abstract BaseRunContext getChildRunContext(String key);
|
||||||
public abstract void putChildRunContext(String key,BaseRunContext context);
|
public abstract void putChildRunContext(String key,BaseRunContext context);
|
||||||
public void callParentChange(ContextStateEnum state){
|
public void callParentChange(ContextStateEnum state){
|
||||||
if (parentContext==null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) {
|
if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) {
|
||||||
parentContext.checkChildEnd();
|
checkChildEnd();
|
||||||
} else if(ContextStateEnum.READY.equals(state)){
|
} else if(ContextStateEnum.READY.equals(state)){
|
||||||
parentContext.checkChildReady();
|
checkChildReady();
|
||||||
} else if(ContextStateEnum.RUNNING.equals(state)){
|
} else if(ContextStateEnum.RUNNING.equals(state)){
|
||||||
parentContext.checkChildRunning();
|
checkChildRunning();
|
||||||
}
|
}
|
||||||
parentContext.callParentChange(state);
|
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* 查找子类是否全部完成,如果子类全部完成则父类也完成
|
* 查找子类是否全部完成,如果子类全部完成则父类也完成
|
||||||
@ -109,13 +94,12 @@ public abstract class BaseRunContext {
|
|||||||
end = true;
|
end = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (end) {
|
if (!end) {
|
||||||
this.changeContextState(ContextStateEnum.getByCode(result));
|
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* 查找子类是否全部就绪,如果子类全部完成则父类也就绪
|
* 查找子类是否全部就绪,如果子类全部完成则父类也就绪
|
||||||
* TODO 逻辑可能有点问题
|
|
||||||
* */
|
* */
|
||||||
public void checkChildReady() throws ServiceException{
|
public void checkChildReady() throws ServiceException{
|
||||||
int result = ContextStateEnum.READY.getCode();
|
int result = ContextStateEnum.READY.getCode();
|
||||||
@ -140,7 +124,6 @@ public abstract class BaseRunContext {
|
|||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* 查找子类是否存在开始运行的,如果有则父状态变成running
|
* 查找子类是否存在开始运行的,如果有则父状态变成running
|
||||||
* TODO 逻辑可能有点问题
|
|
||||||
* */
|
* */
|
||||||
public void checkChildRunning() throws ServiceException{
|
public void checkChildRunning() throws ServiceException{
|
||||||
Boolean runningFlag = false;
|
Boolean runningFlag = false;
|
||||||
|
@ -35,13 +35,10 @@ public class PipelineRunContext extends BaseRunContext{
|
|||||||
* pipeline 底下有多个阶段,多个阶段包含多个task 不保存第二级context
|
* pipeline 底下有多个阶段,多个阶段包含多个task 不保存第二级context
|
||||||
* */
|
* */
|
||||||
@Override
|
@Override
|
||||||
public BaseRunContext getRunContext(String id) {
|
public BaseRunContext getChildRunContext(String stageId) {
|
||||||
if (this.getContextDef().getId().equals(id)) {
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
Map<String, BaseRunContext> childContext = getChildContext();
|
Map<String, BaseRunContext> childContext = getChildContext();
|
||||||
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
|
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
|
||||||
BaseRunContext childRunContext = entry.getValue().getRunContext(id);
|
BaseRunContext childRunContext = entry.getValue().getChildRunContext(stageId);
|
||||||
if (childRunContext!=null) {
|
if (childRunContext!=null) {
|
||||||
return childRunContext;
|
return childRunContext;
|
||||||
}
|
}
|
||||||
|
@ -4,25 +4,21 @@ import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
|||||||
import cd.casic.ci.process.process.dataObject.stage.PipStage;
|
import cd.casic.ci.process.process.dataObject.stage.PipStage;
|
||||||
import cd.casic.framework.commons.exception.ServiceException;
|
import cd.casic.framework.commons.exception.ServiceException;
|
||||||
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
|
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
|
||||||
import cd.casic.framework.commons.util.collection.CollectionUtils;
|
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
public class SecondStageRunContext extends BaseRunContext{
|
public class SecondStageRunContext extends BaseRunContext{
|
||||||
public SecondStageRunContext(PipStage contextDef, PipelineRunContext parentContext, Map<String, Object> localVariables) {
|
public SecondStageRunContext(PipStage contextDef, PipelineRunContext parentContext, Map<String, Object> globalVariables, Map<String, Object> localVariables) {
|
||||||
super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), parentContext.getGlobalVariables(), localVariables, new ConcurrentHashMap<>());
|
super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), globalVariables, localVariables, new ConcurrentHashMap<>(contextDef.getStageList().size()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BaseRunContext getRunContext(String id) {
|
public BaseRunContext getChildRunContext(String taskId) {
|
||||||
if (this.getContextDef().getId().equals(id)) {
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
Map<String, BaseRunContext> childContext = getChildContext();
|
Map<String, BaseRunContext> childContext = getChildContext();
|
||||||
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
|
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
|
||||||
BaseRunContext childRunContext = entry.getValue().getRunContext(id);
|
BaseRunContext childRunContext = entry.getValue().getChildRunContext(taskId);
|
||||||
if (childRunContext!=null) {
|
if (childRunContext!=null) {
|
||||||
return childRunContext;
|
return childRunContext;
|
||||||
}
|
}
|
||||||
|
@ -11,8 +11,8 @@ import java.util.HashMap;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public class TaskRunContext extends BaseRunContext{
|
public class TaskRunContext extends BaseRunContext{
|
||||||
public TaskRunContext(PipTask contextDef, SecondStageRunContext parentContext,Map<String,Object> localVariable) {
|
public TaskRunContext(PipTask contextDef, SecondStageRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables) {
|
||||||
super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), parentContext.getGlobalVariables(),localVariable, new HashMap<>());
|
super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, new HashMap<>());
|
||||||
}
|
}
|
||||||
private TaskRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) {
|
private TaskRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) {
|
||||||
super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, childContext);
|
super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, childContext);
|
||||||
@ -22,8 +22,8 @@ public class TaskRunContext extends BaseRunContext{
|
|||||||
* task是最后一层没有下一级所以如果id相同直接返回它自己
|
* task是最后一层没有下一级所以如果id相同直接返回它自己
|
||||||
* */
|
* */
|
||||||
@Override
|
@Override
|
||||||
public BaseRunContext getRunContext(String id) {
|
public BaseRunContext getChildRunContext(String id) {
|
||||||
if (StringUtils.isEmpty(id)||!id.equals(this.getContextDef().getId())) {
|
if (!StringUtils.isEmpty(id)||!id.equals(this.getContextDef().getId())) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
|
@ -1,50 +0,0 @@
|
|||||||
package cd.casic.ci.process.engine.worker;
|
|
||||||
|
|
||||||
import cd.casic.ci.common.pipeline.annotation.Plugin;
|
|
||||||
import cd.casic.ci.process.engine.enums.ContextStateEnum;
|
|
||||||
import cd.casic.ci.process.engine.manager.RunContextManager;
|
|
||||||
import cd.casic.ci.process.engine.runContext.BaseRunContext;
|
|
||||||
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
|
|
||||||
import cd.casic.ci.process.engine.runContext.TaskRunContext;
|
|
||||||
import cd.casic.framework.commons.exception.ServiceException;
|
|
||||||
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
|
|
||||||
import jakarta.annotation.PostConstruct;
|
|
||||||
import jakarta.annotation.Resource;
|
|
||||||
import jodd.util.StringUtil;
|
|
||||||
import lombok.Data;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
||||||
|
|
||||||
|
|
||||||
@Data
|
|
||||||
|
|
||||||
public abstract class BaseWorker implements Runnable{
|
|
||||||
// 一些属性
|
|
||||||
@Resource
|
|
||||||
private RunContextManager contextManager;
|
|
||||||
private String contextKey;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
if (StringUtils.isEmpty(contextKey)) {
|
|
||||||
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"请设置当前worker的contextKey");
|
|
||||||
}
|
|
||||||
doWorker(contextKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void doWorker(String contextKey){
|
|
||||||
BaseRunContext context = contextManager.getContext(contextKey);
|
|
||||||
if (context instanceof TaskRunContext taskRunContext){
|
|
||||||
try {
|
|
||||||
taskRunContext.changeContextState(ContextStateEnum.RUNNING);
|
|
||||||
execute(taskRunContext);
|
|
||||||
} catch (Exception e) {
|
|
||||||
taskRunContext.changeContextState(ContextStateEnum.BAD_ENDING);
|
|
||||||
}
|
|
||||||
// TODO 执行结束修改context的state,并且通知父类
|
|
||||||
taskRunContext.changeContextState(ContextStateEnum.HAPPY_ENDING);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
public abstract void execute(TaskRunContext context);
|
|
||||||
|
|
||||||
}
|
|
@ -1,23 +0,0 @@
|
|||||||
package cd.casic.ci.process.engine.worker;
|
|
||||||
|
|
||||||
import cd.casic.ci.common.pipeline.annotation.Plugin;
|
|
||||||
import cd.casic.ci.process.engine.runContext.TaskRunContext;
|
|
||||||
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
|
||||||
import com.alibaba.fastjson.JSON;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
||||||
|
|
||||||
|
|
||||||
@Slf4j
|
|
||||||
@Plugin(taskType = "testTask")
|
|
||||||
public class TestWorker extends BaseWorker{
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void execute(TaskRunContext context) {
|
|
||||||
PipBaseElement contextDef = context.getContextDef();
|
|
||||||
String id = contextDef.getId();
|
|
||||||
log.info("==============触发worker执行========");
|
|
||||||
log.info("==========运行context:{}===========", JSON.toJSONString(context));
|
|
||||||
}
|
|
||||||
}
|
|
@ -3,8 +3,6 @@ package cd.casic.ci.process.process.converter;
|
|||||||
import cd.casic.ci.common.pipeline.resp.pipeline.PipelineFindResp;
|
import cd.casic.ci.common.pipeline.resp.pipeline.PipelineFindResp;
|
||||||
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
|
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
|
||||||
import org.mapstruct.Mapper;
|
import org.mapstruct.Mapper;
|
||||||
import org.mapstruct.Mapping;
|
|
||||||
import org.mapstruct.Mappings;
|
|
||||||
import org.mapstruct.factory.Mappers;
|
import org.mapstruct.factory.Mappers;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -19,6 +17,7 @@ import java.util.List;
|
|||||||
@Mapper(componentModel = "spring")
|
@Mapper(componentModel = "spring")
|
||||||
public interface PipelineConverter {
|
public interface PipelineConverter {
|
||||||
PipelineConverter INSTANCE = Mappers.getMapper(PipelineConverter.class);
|
PipelineConverter INSTANCE = Mappers.getMapper(PipelineConverter.class);
|
||||||
|
|
||||||
PipelineFindResp toResp(PipPipeline pipPipeline);
|
PipelineFindResp toResp(PipPipeline pipPipeline);
|
||||||
List<PipelineFindResp> toRespList(List<PipPipeline> pipPipelines);
|
List<PipelineFindResp> toRespList(List<PipPipeline> pipPipelines);
|
||||||
|
|
||||||
|
@ -32,10 +32,22 @@ import java.util.Set;
|
|||||||
|
|
||||||
@Service
|
@Service
|
||||||
public class TaskServiceImpl extends ServiceImpl<PipTaskDao, PipTask> implements TaskService {
|
public class TaskServiceImpl extends ServiceImpl<PipTaskDao, PipTask> implements TaskService {
|
||||||
|
private static final String basePackage = "cd.casic.ci.process.process.service";
|
||||||
|
private Set<BeanDefinition> candidates;
|
||||||
@Resource
|
@Resource
|
||||||
private PipTaskDao taskDao;
|
private PipTaskDao taskDao;
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void init(){
|
||||||
|
ClassPathScanningCandidateComponentProvider provider =
|
||||||
|
new ClassPathScanningCandidateComponentProvider(false);
|
||||||
|
provider.addIncludeFilter(new AnnotationTypeFilter(Plugin.class));
|
||||||
|
candidates = provider.findCandidateComponents(basePackage);
|
||||||
|
for (BeanDefinition candidate : candidates) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@Override
|
@Override
|
||||||
public void taskTypeExist(String taskType) {
|
public void taskTypeExist(String taskType) {
|
||||||
|
|
||||||
|
@ -0,0 +1,7 @@
|
|||||||
|
package cd.casic.ci.process.process.service.worker;
|
||||||
|
|
||||||
|
import cd.casic.ci.common.pipeline.annotation.Plugin;
|
||||||
|
|
||||||
|
@Plugin
|
||||||
|
public class TestWorker {
|
||||||
|
}
|
@ -1,61 +0,0 @@
|
|||||||
package cd.casic.server;
|
|
||||||
|
|
||||||
import cd.casic.ci.process.engine.manager.RunContextManager;
|
|
||||||
import cd.casic.ci.process.engine.message.TaskRunMessage;
|
|
||||||
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
|
|
||||||
import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
|
|
||||||
import cd.casic.ci.process.engine.runContext.TaskRunContext;
|
|
||||||
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
|
|
||||||
import cd.casic.ci.process.process.dataObject.stage.PipStage;
|
|
||||||
import cd.casic.ci.process.process.dataObject.task.PipTask;
|
|
||||||
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
|
|
||||||
import jakarta.annotation.Resource;
|
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
import org.springframework.boot.test.context.SpringBootTest;
|
|
||||||
import org.springframework.test.context.ActiveProfiles;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
|
|
||||||
@SpringBootTest(classes = {OpsServerApplication.class})
|
|
||||||
@ActiveProfiles("local")
|
|
||||||
public class RedisMqTest {
|
|
||||||
@Resource
|
|
||||||
RedisMQTemplate redisMQTemplate;
|
|
||||||
@Resource
|
|
||||||
RunContextManager contextManager;
|
|
||||||
@Test
|
|
||||||
public void test01(){
|
|
||||||
System.out.println("h w !");
|
|
||||||
redisMQTemplate.send(new TaskRunMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void test02(){
|
|
||||||
System.out.println("h w !");
|
|
||||||
TaskRunMessage taskRunMessage = new TaskRunMessage();
|
|
||||||
PipTask pipTask = new PipTask();
|
|
||||||
pipTask.setTaskType("testTask");
|
|
||||||
pipTask.setId("testTaskId");
|
|
||||||
pipTask.setStageId("testStage");
|
|
||||||
PipPipeline pipeline = new PipPipeline();
|
|
||||||
pipeline.setId("testPipeline");
|
|
||||||
PipelineRunContext pipelineRunContext = new PipelineRunContext(pipeline);
|
|
||||||
PipStage stage = new PipStage();
|
|
||||||
stage.setId("testStage");
|
|
||||||
stage.setParentId("testPipeline");
|
|
||||||
SecondStageRunContext secondStageRunContext = new SecondStageRunContext(stage,pipelineRunContext,new ConcurrentHashMap<>());
|
|
||||||
TaskRunContext taskRunContext = new TaskRunContext(pipTask,secondStageRunContext,new HashMap<>());
|
|
||||||
contextManager.contextRegister(pipelineRunContext);
|
|
||||||
contextManager.contextRegister(secondStageRunContext);
|
|
||||||
contextManager.contextRegister(taskRunContext);
|
|
||||||
taskRunMessage.setTask(pipTask);
|
|
||||||
redisMQTemplate.send(taskRunMessage);
|
|
||||||
try {
|
|
||||||
Thread.sleep(5000L);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
x
Reference in New Issue
Block a user