Compare commits
No commits in common. "6b79782a2bef9fdfb8a1d57c5b5dc0e8db62687c" and "575b415c162ddb0f34979b14dd3420c998e4c8a2" have entirely different histories.
6b79782a2b
...
575b415c16
@ -1,7 +1,5 @@
|
||||
package cd.casic.ci.common.pipeline.annotation;
|
||||
|
||||
import org.springframework.core.annotation.AliasFor;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.stereotype.Indexed;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
@ -10,7 +8,5 @@ import java.lang.annotation.*;
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
@Indexed
|
||||
@Component
|
||||
public @interface Plugin {
|
||||
String taskType();
|
||||
}
|
||||
|
@ -10,37 +10,13 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
@Configuration
|
||||
public class ExecutorConfig {
|
||||
@Bean("parallelExecutor")
|
||||
public ThreadPoolTaskExecutor pipelineExecutor() {
|
||||
@Bean("pipelineExecutor")
|
||||
public ThreadPoolTaskExecutor taskExecutor() {
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
executor.setCorePoolSize(5);
|
||||
executor.setMaxPoolSize(10);
|
||||
executor.setQueueCapacity(100);
|
||||
executor.setThreadNamePrefix("Parallel-");
|
||||
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
|
||||
executor.setRejectedExecutionHandler(callerRunsPolicy);
|
||||
executor.initialize(); // 必须手动触发初始化
|
||||
return executor;
|
||||
}
|
||||
@Bean("serialExecutor")
|
||||
public ThreadPoolTaskExecutor serialExecutor() {
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
executor.setCorePoolSize(5);
|
||||
executor.setMaxPoolSize(10);
|
||||
executor.setQueueCapacity(100);
|
||||
executor.setThreadNamePrefix("Serial-");
|
||||
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
|
||||
executor.setRejectedExecutionHandler(callerRunsPolicy);
|
||||
executor.initialize(); // 必须手动触发初始化
|
||||
return executor;
|
||||
}
|
||||
@Bean("workerExecutor")
|
||||
public ThreadPoolTaskExecutor workerExecutor() {
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
executor.setCorePoolSize(5);
|
||||
executor.setMaxPoolSize(10);
|
||||
executor.setQueueCapacity(100);
|
||||
executor.setThreadNamePrefix("Worker-");
|
||||
executor.setThreadNamePrefix("Pipeline-");
|
||||
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
|
||||
executor.setRejectedExecutionHandler(callerRunsPolicy);
|
||||
executor.initialize(); // 必须手动触发初始化
|
||||
|
@ -7,9 +7,7 @@ import cd.casic.ci.process.engine.runContext.PipelineRunContext;
|
||||
import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
|
||||
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
||||
import cd.casic.ci.process.process.dataObject.stage.PipStage;
|
||||
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
|
||||
import org.springframework.core.task.TaskExecutor;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.Collection;
|
||||
@ -26,17 +24,13 @@ public class ParallelDispatcher implements BaseDispatcher{
|
||||
private Integer stageIndex;
|
||||
private PipelineRunContext pipelineRunContext;
|
||||
private RunContextManager runContextManager;
|
||||
private RedisMQTemplate redisMQTemplate;
|
||||
private ThreadPoolTaskExecutor taskExecutor;
|
||||
|
||||
public ParallelDispatcher(List<PipStage> firstStageList, PipelineRunContext context,RunContextManager contextManager,RedisMQTemplate redisMQTemplate,ThreadPoolTaskExecutor taskExecutor) {
|
||||
public ParallelDispatcher(List<PipStage> firstStageList, PipelineRunContext context,RunContextManager contextManager) {
|
||||
this.firstStageList = firstStageList;
|
||||
this.pipelineRunContext = context;
|
||||
this.stageIndex = 0;
|
||||
this.runContextManager = contextManager;
|
||||
contextManager.contextRegister(context);
|
||||
this.redisMQTemplate = redisMQTemplate;
|
||||
this.taskExecutor = taskExecutor;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -51,16 +45,15 @@ public class ParallelDispatcher implements BaseDispatcher{
|
||||
CountDownLatch latch = new CountDownLatch(stageList.size());
|
||||
for (PipStage secondStage : stageList) {
|
||||
// 二阶段下所有task是串行所以不用关心线程安全相关信息
|
||||
SecondStageRunContext context = new SecondStageRunContext(secondStage,pipelineRunContext,new ConcurrentHashMap<>());
|
||||
SecondStageRunContext context = new SecondStageRunContext(secondStage,pipelineRunContext,new HashMap<>(),new HashMap<>());
|
||||
|
||||
runContextManager.contextRegister(context);
|
||||
SerialDispatcher serialDispatcher = new SerialDispatcher(context,latch,runContextManager,redisMQTemplate);
|
||||
SerialDispatcher serialDispatcher = new SerialDispatcher(context,latch);
|
||||
// 给线程池进行执行
|
||||
taskExecutor.execute(serialDispatcher);
|
||||
}
|
||||
// 等待当前阶段执行
|
||||
latch.await();
|
||||
}
|
||||
// 入库
|
||||
|
||||
}
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -1,31 +1,22 @@
|
||||
package cd.casic.ci.process.engine.dispatcher.impl;
|
||||
|
||||
import cd.casic.ci.process.engine.dispatcher.BaseDispatcher;
|
||||
import cd.casic.ci.process.engine.enums.ContextStateEnum;
|
||||
import cd.casic.ci.process.engine.manager.RunContextManager;
|
||||
import cd.casic.ci.process.engine.message.TaskRunMessage;
|
||||
import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
|
||||
import cd.casic.ci.process.engine.runContext.TaskRunContext;
|
||||
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
||||
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
|
||||
import cd.casic.ci.process.process.dataObject.stage.PipStage;
|
||||
import cd.casic.ci.process.process.dataObject.task.PipTask;
|
||||
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class SerialDispatcher implements BaseDispatcher {
|
||||
private SecondStageRunContext stageRunContext;
|
||||
private List<PipTask> taskList;
|
||||
private CountDownLatch latch;
|
||||
private String triggerModel;
|
||||
private RedisMQTemplate redisMQTemplate;
|
||||
private RunContextManager contextManager;
|
||||
|
||||
public SerialDispatcher(SecondStageRunContext stageRunContext, CountDownLatch latch,RunContextManager contextManager,RedisMQTemplate redisMQTemplate) {
|
||||
public SerialDispatcher(SecondStageRunContext stageRunContext, CountDownLatch latch) {
|
||||
this.stageRunContext = stageRunContext;
|
||||
PipBaseElement contextDef = stageRunContext.getContextDef();
|
||||
if (contextDef instanceof PipStage secondStage) {
|
||||
@ -33,27 +24,12 @@ public class SerialDispatcher implements BaseDispatcher {
|
||||
this.taskList = secondStage.getTaskValues();
|
||||
}
|
||||
this.latch = latch;
|
||||
this.redisMQTemplate = redisMQTemplate;
|
||||
this.contextManager = contextManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispatch() throws InterruptedException {
|
||||
public void dispatch() {
|
||||
for (PipTask pipTask : taskList) {
|
||||
// 注册taskContext,且发送消息至消息队列给work执行, 如果需要则传入参数
|
||||
TaskRunContext taskRunContext = new TaskRunContext(pipTask,stageRunContext,new HashMap<>());
|
||||
contextManager.contextRegister(taskRunContext);
|
||||
taskRunContext.changeContextState(ContextStateEnum.READY);
|
||||
TaskRunMessage taskRunMessage = new TaskRunMessage(pipTask);
|
||||
redisMQTemplate.send(taskRunMessage);
|
||||
// TODO 监听当前taskContext状态变成执行成功或者执行失败(worker当中改变状态为运行中、执行成功、执行失败)
|
||||
//
|
||||
AtomicInteger state = taskRunContext.getState();
|
||||
while (state.get() != ContextStateEnum.HAPPY_ENDING.getCode()
|
||||
&& state.get() != ContextStateEnum.BAD_ENDING.getCode()) {
|
||||
Thread.sleep(1000L);
|
||||
}
|
||||
//
|
||||
// TODO 注册taskContext,且发送消息至消息队列给work执行
|
||||
}
|
||||
}
|
||||
|
||||
@ -64,10 +40,10 @@ public class SerialDispatcher implements BaseDispatcher {
|
||||
// TODO 看看需要内存入库还是忽略掉当前执行,进行入库(countDown放行)
|
||||
// stageRunContext.callParentChange();
|
||||
dispatch();
|
||||
latch.countDown();
|
||||
} catch (Exception e) {
|
||||
// throw new RuntimeException(e);
|
||||
// TODO 当前stage标记为失败等待父context发现并处理
|
||||
}
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
@ -12,7 +12,6 @@ import cd.casic.ci.process.process.service.pipeline.PipelineService;
|
||||
import cd.casic.ci.process.process.service.stage.StageService;
|
||||
import cd.casic.framework.commons.exception.ServiceException;
|
||||
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
|
||||
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
|
||||
import jakarta.annotation.Resource;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
@ -34,11 +33,8 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
|
||||
@Resource
|
||||
private RunContextManager runContextManager;
|
||||
@Resource
|
||||
private ThreadPoolTaskExecutor parallelExecutor;
|
||||
@Resource
|
||||
private ThreadPoolTaskExecutor serialExecutor;
|
||||
@Resource
|
||||
private RedisMQTemplate redisMQTemplate;
|
||||
@Qualifier("pipelineExecutor")
|
||||
private ThreadPoolTaskExecutor taskExecutor;
|
||||
@Override
|
||||
public PipelineRunContext execute(String pipelineId) {
|
||||
PipPipeline pipeline = pipelineService.getById(pipelineId);
|
||||
@ -52,10 +48,9 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
|
||||
if (CollectionUtils.isEmpty(mainStage)) {
|
||||
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到有效阶段信息");
|
||||
}
|
||||
// 如果要做 容灾就需要重新将数据库存的记录按顺序加载入
|
||||
PipelineRunContext pipelineRunContext = new PipelineRunContext(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>());
|
||||
ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager,redisMQTemplate,serialExecutor);
|
||||
parallelExecutor.execute(parallelDispatcher);
|
||||
ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager);
|
||||
taskExecutor.execute(parallelDispatcher);
|
||||
return pipelineRunContext;
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,4 @@
|
||||
package cd.casic.ci.process.engine.manager;
|
||||
/**
|
||||
* 负责监听队列,找到ContextManager获取runContext,然后实际执行
|
||||
* */
|
||||
|
||||
public interface WorkerManager {
|
||||
}
|
||||
|
@ -2,20 +2,10 @@ package cd.casic.ci.process.engine.manager.impl;
|
||||
|
||||
import cd.casic.ci.process.engine.manager.RunContextManager;
|
||||
import cd.casic.ci.process.engine.runContext.BaseRunContext;
|
||||
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
|
||||
import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
|
||||
import cd.casic.ci.process.engine.runContext.TaskRunContext;
|
||||
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
||||
import cd.casic.framework.commons.exception.ServiceException;
|
||||
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@Component
|
||||
public class DefaultRunContextManager implements RunContextManager {
|
||||
private final Map<String,PipelineRunContext> contextMap= new ConcurrentHashMap();
|
||||
@Override
|
||||
public Boolean stopPipeline(String pipelineId) {
|
||||
return null;
|
||||
@ -38,50 +28,16 @@ public class DefaultRunContextManager implements RunContextManager {
|
||||
|
||||
@Override
|
||||
public Boolean suspendStage(String pipelineId, String stageId) {
|
||||
PipelineRunContext pipelineRunContext = contextMap.get(pipelineId);
|
||||
if (pipelineRunContext !=null) {
|
||||
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void contextRegister(BaseRunContext context) {
|
||||
PipBaseElement contextDef = context.getContextDef();
|
||||
String id = contextDef.getId();
|
||||
BaseRunContext parentContext = context.getParentContext();
|
||||
if (context instanceof PipelineRunContext pipelineRunContext) {
|
||||
contextMap.put(id,pipelineRunContext);
|
||||
} else {
|
||||
if (parentContext==null) {
|
||||
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"注册context失败");
|
||||
}
|
||||
PipBaseElement parentContextDef = parentContext.getContextDef();
|
||||
BaseRunContext parentRunContext = null;
|
||||
for (Map.Entry<String, PipelineRunContext> entry : contextMap.entrySet()) {
|
||||
PipelineRunContext value = entry.getValue();
|
||||
BaseRunContext runContext = value.getRunContext(parentContextDef.getId());
|
||||
if (runContext!=null) {
|
||||
parentRunContext = runContext;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (parentRunContext==null) {
|
||||
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到父级context");
|
||||
}
|
||||
parentRunContext.putChildRunContext(id,context);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public BaseRunContext getContext(String key) {
|
||||
for (Map.Entry<String, PipelineRunContext> entry : contextMap.entrySet()) {
|
||||
PipelineRunContext value = entry.getValue();
|
||||
BaseRunContext runContext = value.getRunContext(key);
|
||||
if (runContext!=null) {
|
||||
return runContext;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -1,86 +0,0 @@
|
||||
package cd.casic.ci.process.engine.manager.impl;
|
||||
|
||||
import cd.casic.ci.common.pipeline.annotation.Plugin;
|
||||
import cd.casic.ci.process.engine.message.TaskRunMessage;
|
||||
import cd.casic.ci.process.engine.worker.BaseWorker;
|
||||
import cd.casic.ci.process.process.dataObject.task.PipTask;
|
||||
import cd.casic.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.Resource;
|
||||
import jodd.util.StringUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.config.BeanDefinition;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
|
||||
import org.springframework.core.annotation.AnnotatedElementUtils;
|
||||
import org.springframework.core.type.filter.AnnotationTypeFilter;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
public class DefaultWorkerManager extends AbstractRedisStreamMessageListener<TaskRunMessage> implements ApplicationContextAware {
|
||||
private static final String basePackage = "cd.casic.ci.process.engine.worker";
|
||||
private Set<BeanDefinition> candidates;
|
||||
private ApplicationContext applicationContext;
|
||||
private Map<String,BaseWorker> taskTypeWorkerMap = null;
|
||||
@Resource
|
||||
private ThreadPoolTaskExecutor workerExecutor;
|
||||
|
||||
private void setTaskTypeWorker(Map<String, BaseWorker> taskTypeWorker) {
|
||||
this.taskTypeWorkerMap = taskTypeWorker;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void init(){
|
||||
ClassPathScanningCandidateComponentProvider provider =
|
||||
new ClassPathScanningCandidateComponentProvider(false);
|
||||
provider.addIncludeFilter(new AnnotationTypeFilter(Plugin.class));
|
||||
candidates = provider.findCandidateComponents(basePackage);
|
||||
taskTypeWorkerMap = new HashMap<>(candidates.size());
|
||||
for (BeanDefinition candidate : candidates) {
|
||||
String beanClassName = candidate.getBeanClassName();
|
||||
Class<?> workerClass = null;
|
||||
try {
|
||||
workerClass = Class.forName(beanClassName);
|
||||
} catch (ClassNotFoundException e) {
|
||||
continue;
|
||||
}
|
||||
Object bean = applicationContext.getBean(workerClass);
|
||||
if (bean instanceof BaseWorker worker) {
|
||||
Plugin annotation = worker.getClass().getAnnotation(Plugin.class);
|
||||
String taskType = annotation.taskType();
|
||||
if (StringUtils.isNotEmpty(taskType)) {
|
||||
taskTypeWorkerMap.put(taskType,worker);
|
||||
}
|
||||
}
|
||||
}
|
||||
log.info("================WorkerManager初始化完毕");
|
||||
}
|
||||
@Override
|
||||
public void onMessage(TaskRunMessage message) {
|
||||
log.info("===============接收到消息================");
|
||||
try {
|
||||
PipTask task = message.getTask();
|
||||
String taskType = task.getTaskType();
|
||||
BaseWorker baseWorker = taskTypeWorkerMap.get(taskType);
|
||||
baseWorker.setContextKey(task.getId());
|
||||
workerExecutor.execute(baseWorker);
|
||||
}catch (Exception e){
|
||||
// TODO 后期可以考虑专门整一个队列
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
||||
this.applicationContext = applicationContext;
|
||||
}
|
||||
}
|
@ -1,20 +0,0 @@
|
||||
package cd.casic.ci.process.engine.message;
|
||||
|
||||
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
||||
import cd.casic.ci.process.process.dataObject.task.PipTask;
|
||||
import cd.casic.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
|
||||
import cd.casic.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
public class TaskRunMessage extends AbstractRedisStreamMessage {
|
||||
private PipTask task;
|
||||
|
||||
public TaskRunMessage(PipTask task) {
|
||||
this.task = task;
|
||||
}
|
||||
}
|
@ -54,35 +54,20 @@ public abstract class BaseRunContext {
|
||||
this.localVariables = localVariables;
|
||||
this.childContext = childContext;
|
||||
}
|
||||
public void changeContextState(ContextStateEnum stateEnum){
|
||||
ContextStateEnum curr = ContextStateEnum.getByCode(state.get());
|
||||
if (ContextStateEnum.canGoto(curr,stateEnum)) {
|
||||
state.compareAndExchange(curr.getCode(),stateEnum.getCode());
|
||||
callParentChange(stateEnum);
|
||||
}
|
||||
}
|
||||
// 保证一直都操作同一个引用的值
|
||||
private void setState(AtomicInteger state) {
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取当前或者子上下文
|
||||
* */
|
||||
public abstract BaseRunContext getRunContext(String key);
|
||||
public abstract BaseRunContext getChildRunContext(String key);
|
||||
public abstract void putChildRunContext(String key,BaseRunContext context);
|
||||
public void callParentChange(ContextStateEnum state){
|
||||
if (parentContext==null) {
|
||||
return;
|
||||
}
|
||||
if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) {
|
||||
parentContext.checkChildEnd();
|
||||
checkChildEnd();
|
||||
} else if(ContextStateEnum.READY.equals(state)){
|
||||
parentContext.checkChildReady();
|
||||
checkChildReady();
|
||||
} else if(ContextStateEnum.RUNNING.equals(state)){
|
||||
parentContext.checkChildRunning();
|
||||
checkChildRunning();
|
||||
}
|
||||
parentContext.callParentChange(state);
|
||||
}
|
||||
/**
|
||||
* 查找子类是否全部完成,如果子类全部完成则父类也完成
|
||||
@ -109,13 +94,12 @@ public abstract class BaseRunContext {
|
||||
end = true;
|
||||
}
|
||||
}
|
||||
if (end) {
|
||||
this.changeContextState(ContextStateEnum.getByCode(result));
|
||||
if (!end) {
|
||||
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误");
|
||||
}
|
||||
}
|
||||
/**
|
||||
* 查找子类是否全部就绪,如果子类全部完成则父类也就绪
|
||||
* TODO 逻辑可能有点问题
|
||||
* */
|
||||
public void checkChildReady() throws ServiceException{
|
||||
int result = ContextStateEnum.READY.getCode();
|
||||
@ -140,7 +124,6 @@ public abstract class BaseRunContext {
|
||||
}
|
||||
/**
|
||||
* 查找子类是否存在开始运行的,如果有则父状态变成running
|
||||
* TODO 逻辑可能有点问题
|
||||
* */
|
||||
public void checkChildRunning() throws ServiceException{
|
||||
Boolean runningFlag = false;
|
||||
|
@ -35,13 +35,10 @@ public class PipelineRunContext extends BaseRunContext{
|
||||
* pipeline 底下有多个阶段,多个阶段包含多个task 不保存第二级context
|
||||
* */
|
||||
@Override
|
||||
public BaseRunContext getRunContext(String id) {
|
||||
if (this.getContextDef().getId().equals(id)) {
|
||||
return this;
|
||||
}
|
||||
public BaseRunContext getChildRunContext(String stageId) {
|
||||
Map<String, BaseRunContext> childContext = getChildContext();
|
||||
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
|
||||
BaseRunContext childRunContext = entry.getValue().getRunContext(id);
|
||||
BaseRunContext childRunContext = entry.getValue().getChildRunContext(stageId);
|
||||
if (childRunContext!=null) {
|
||||
return childRunContext;
|
||||
}
|
||||
|
@ -4,25 +4,21 @@ import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
||||
import cd.casic.ci.process.process.dataObject.stage.PipStage;
|
||||
import cd.casic.framework.commons.exception.ServiceException;
|
||||
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
|
||||
import cd.casic.framework.commons.util.collection.CollectionUtils;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class SecondStageRunContext extends BaseRunContext{
|
||||
public SecondStageRunContext(PipStage contextDef, PipelineRunContext parentContext, Map<String, Object> localVariables) {
|
||||
super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), parentContext.getGlobalVariables(), localVariables, new ConcurrentHashMap<>());
|
||||
public SecondStageRunContext(PipStage contextDef, PipelineRunContext parentContext, Map<String, Object> globalVariables, Map<String, Object> localVariables) {
|
||||
super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), globalVariables, localVariables, new ConcurrentHashMap<>(contextDef.getStageList().size()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public BaseRunContext getRunContext(String id) {
|
||||
if (this.getContextDef().getId().equals(id)) {
|
||||
return this;
|
||||
}
|
||||
public BaseRunContext getChildRunContext(String taskId) {
|
||||
Map<String, BaseRunContext> childContext = getChildContext();
|
||||
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
|
||||
BaseRunContext childRunContext = entry.getValue().getRunContext(id);
|
||||
BaseRunContext childRunContext = entry.getValue().getChildRunContext(taskId);
|
||||
if (childRunContext!=null) {
|
||||
return childRunContext;
|
||||
}
|
||||
|
@ -11,8 +11,8 @@ import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class TaskRunContext extends BaseRunContext{
|
||||
public TaskRunContext(PipTask contextDef, SecondStageRunContext parentContext,Map<String,Object> localVariable) {
|
||||
super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), parentContext.getGlobalVariables(),localVariable, new HashMap<>());
|
||||
public TaskRunContext(PipTask contextDef, SecondStageRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables) {
|
||||
super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, new HashMap<>());
|
||||
}
|
||||
private TaskRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) {
|
||||
super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, childContext);
|
||||
@ -22,8 +22,8 @@ public class TaskRunContext extends BaseRunContext{
|
||||
* task是最后一层没有下一级所以如果id相同直接返回它自己
|
||||
* */
|
||||
@Override
|
||||
public BaseRunContext getRunContext(String id) {
|
||||
if (StringUtils.isEmpty(id)||!id.equals(this.getContextDef().getId())) {
|
||||
public BaseRunContext getChildRunContext(String id) {
|
||||
if (!StringUtils.isEmpty(id)||!id.equals(this.getContextDef().getId())) {
|
||||
return null;
|
||||
}
|
||||
return this;
|
||||
|
@ -1,50 +0,0 @@
|
||||
package cd.casic.ci.process.engine.worker;
|
||||
|
||||
import cd.casic.ci.common.pipeline.annotation.Plugin;
|
||||
import cd.casic.ci.process.engine.enums.ContextStateEnum;
|
||||
import cd.casic.ci.process.engine.manager.RunContextManager;
|
||||
import cd.casic.ci.process.engine.runContext.BaseRunContext;
|
||||
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
|
||||
import cd.casic.ci.process.engine.runContext.TaskRunContext;
|
||||
import cd.casic.framework.commons.exception.ServiceException;
|
||||
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.Resource;
|
||||
import jodd.util.StringUtil;
|
||||
import lombok.Data;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
|
||||
|
||||
@Data
|
||||
|
||||
public abstract class BaseWorker implements Runnable{
|
||||
// 一些属性
|
||||
@Resource
|
||||
private RunContextManager contextManager;
|
||||
private String contextKey;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (StringUtils.isEmpty(contextKey)) {
|
||||
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"请设置当前worker的contextKey");
|
||||
}
|
||||
doWorker(contextKey);
|
||||
}
|
||||
|
||||
public void doWorker(String contextKey){
|
||||
BaseRunContext context = contextManager.getContext(contextKey);
|
||||
if (context instanceof TaskRunContext taskRunContext){
|
||||
try {
|
||||
taskRunContext.changeContextState(ContextStateEnum.RUNNING);
|
||||
execute(taskRunContext);
|
||||
} catch (Exception e) {
|
||||
taskRunContext.changeContextState(ContextStateEnum.BAD_ENDING);
|
||||
}
|
||||
// TODO 执行结束修改context的state,并且通知父类
|
||||
taskRunContext.changeContextState(ContextStateEnum.HAPPY_ENDING);
|
||||
}
|
||||
}
|
||||
public abstract void execute(TaskRunContext context);
|
||||
|
||||
}
|
@ -1,23 +0,0 @@
|
||||
package cd.casic.ci.process.engine.worker;
|
||||
|
||||
import cd.casic.ci.common.pipeline.annotation.Plugin;
|
||||
import cd.casic.ci.process.engine.runContext.TaskRunContext;
|
||||
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
|
||||
|
||||
@Slf4j
|
||||
@Plugin(taskType = "testTask")
|
||||
public class TestWorker extends BaseWorker{
|
||||
|
||||
|
||||
@Override
|
||||
public void execute(TaskRunContext context) {
|
||||
PipBaseElement contextDef = context.getContextDef();
|
||||
String id = contextDef.getId();
|
||||
log.info("==============触发worker执行========");
|
||||
log.info("==========运行context:{}===========", JSON.toJSONString(context));
|
||||
}
|
||||
}
|
@ -3,8 +3,6 @@ package cd.casic.ci.process.process.converter;
|
||||
import cd.casic.ci.common.pipeline.resp.pipeline.PipelineFindResp;
|
||||
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
|
||||
import org.mapstruct.Mapper;
|
||||
import org.mapstruct.Mapping;
|
||||
import org.mapstruct.Mappings;
|
||||
import org.mapstruct.factory.Mappers;
|
||||
|
||||
import java.util.List;
|
||||
@ -19,6 +17,7 @@ import java.util.List;
|
||||
@Mapper(componentModel = "spring")
|
||||
public interface PipelineConverter {
|
||||
PipelineConverter INSTANCE = Mappers.getMapper(PipelineConverter.class);
|
||||
|
||||
PipelineFindResp toResp(PipPipeline pipPipeline);
|
||||
List<PipelineFindResp> toRespList(List<PipPipeline> pipPipelines);
|
||||
|
||||
|
@ -32,10 +32,22 @@ import java.util.Set;
|
||||
|
||||
@Service
|
||||
public class TaskServiceImpl extends ServiceImpl<PipTaskDao, PipTask> implements TaskService {
|
||||
private static final String basePackage = "cd.casic.ci.process.process.service";
|
||||
private Set<BeanDefinition> candidates;
|
||||
@Resource
|
||||
private PipTaskDao taskDao;
|
||||
|
||||
@PostConstruct
|
||||
public void init(){
|
||||
ClassPathScanningCandidateComponentProvider provider =
|
||||
new ClassPathScanningCandidateComponentProvider(false);
|
||||
provider.addIncludeFilter(new AnnotationTypeFilter(Plugin.class));
|
||||
candidates = provider.findCandidateComponents(basePackage);
|
||||
for (BeanDefinition candidate : candidates) {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@Override
|
||||
public void taskTypeExist(String taskType) {
|
||||
|
||||
|
@ -0,0 +1,7 @@
|
||||
package cd.casic.ci.process.process.service.worker;
|
||||
|
||||
import cd.casic.ci.common.pipeline.annotation.Plugin;
|
||||
|
||||
@Plugin
|
||||
public class TestWorker {
|
||||
}
|
@ -1,61 +0,0 @@
|
||||
package cd.casic.server;
|
||||
|
||||
import cd.casic.ci.process.engine.manager.RunContextManager;
|
||||
import cd.casic.ci.process.engine.message.TaskRunMessage;
|
||||
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
|
||||
import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
|
||||
import cd.casic.ci.process.engine.runContext.TaskRunContext;
|
||||
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
|
||||
import cd.casic.ci.process.process.dataObject.stage.PipStage;
|
||||
import cd.casic.ci.process.process.dataObject.task.PipTask;
|
||||
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
|
||||
import jakarta.annotation.Resource;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.context.ActiveProfiles;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@SpringBootTest(classes = {OpsServerApplication.class})
|
||||
@ActiveProfiles("local")
|
||||
public class RedisMqTest {
|
||||
@Resource
|
||||
RedisMQTemplate redisMQTemplate;
|
||||
@Resource
|
||||
RunContextManager contextManager;
|
||||
@Test
|
||||
public void test01(){
|
||||
System.out.println("h w !");
|
||||
redisMQTemplate.send(new TaskRunMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test02(){
|
||||
System.out.println("h w !");
|
||||
TaskRunMessage taskRunMessage = new TaskRunMessage();
|
||||
PipTask pipTask = new PipTask();
|
||||
pipTask.setTaskType("testTask");
|
||||
pipTask.setId("testTaskId");
|
||||
pipTask.setStageId("testStage");
|
||||
PipPipeline pipeline = new PipPipeline();
|
||||
pipeline.setId("testPipeline");
|
||||
PipelineRunContext pipelineRunContext = new PipelineRunContext(pipeline);
|
||||
PipStage stage = new PipStage();
|
||||
stage.setId("testStage");
|
||||
stage.setParentId("testPipeline");
|
||||
SecondStageRunContext secondStageRunContext = new SecondStageRunContext(stage,pipelineRunContext,new ConcurrentHashMap<>());
|
||||
TaskRunContext taskRunContext = new TaskRunContext(pipTask,secondStageRunContext,new HashMap<>());
|
||||
contextManager.contextRegister(pipelineRunContext);
|
||||
contextManager.contextRegister(secondStageRunContext);
|
||||
contextManager.contextRegister(taskRunContext);
|
||||
taskRunMessage.setTask(pipTask);
|
||||
redisMQTemplate.send(taskRunMessage);
|
||||
try {
|
||||
Thread.sleep(5000L);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user