执行逻辑修改-每次启动标记上一次执行为执行失败

This commit is contained in:
even 2025-05-30 14:52:57 +08:00
parent 82f1aaa72f
commit ead82b946d
6 changed files with 44 additions and 4 deletions

View File

@ -2,6 +2,7 @@ package cd.casic.ci.process.engine.dispatcher;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import lombok.extern.slf4j.Slf4j;
import java.util.List;

View File

@ -8,6 +8,7 @@ import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.stage.PipStage;
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.CollectionUtils;
@ -19,7 +20,7 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@Slf4j
public class ParallelDispatcher implements BaseDispatcher{
private List<PipStage> firstStageList;
@ -34,7 +35,6 @@ public class ParallelDispatcher implements BaseDispatcher{
this.pipelineRunContext = context;
this.stageIndex = 0;
this.runContextManager = contextManager;
contextManager.contextRegister(context);
this.redisMQTemplate = redisMQTemplate;
this.taskExecutor = taskExecutor;
}
@ -59,6 +59,10 @@ public class ParallelDispatcher implements BaseDispatcher{
}
// 等待当前阶段执行
latch.await();
if (pipelineRunContext.getState().get()== ContextStateEnum.BAD_ENDING.getCode()) {
log.error("并行执行停止");
break;
}
// TODO 检查是否全部执行成功 目前没有逻辑就是忽略错误
// 当前执行失败
// while (pipelineRunContext.getState().get() != ContextStateEnum.RUNNING.getCode()) {

View File

@ -11,12 +11,13 @@ import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.dataObject.stage.PipStage;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class SerialDispatcher implements BaseDispatcher {
private SecondStageRunContext stageRunContext;
private List<PipTask> taskList;
@ -57,6 +58,10 @@ public class SerialDispatcher implements BaseDispatcher {
taskRunContext.pause();
}
//
if (state.get()== ContextStateEnum.BAD_ENDING.getCode()) {
log.error("串行执行停止");
break;
}
}
}

View File

@ -61,6 +61,7 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
}
// 如果要做 容灾就需要重新将数据库存的记录按顺序加载入
PipelineRunContext pipelineRunContext = new PipelineRunContext(pipeline,childCount,null,new ConcurrentHashMap<>(),new ConcurrentHashMap<>());
runContextManager.contextRegister(pipelineRunContext);
ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager,redisMQTemplate,serialExecutor);
parallelExecutor.execute(parallelDispatcher);
return pipelineRunContext;

View File

@ -1,5 +1,6 @@
package cd.casic.ci.process.engine.manager.impl;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.engine.manager.RunContextManager;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
@ -10,6 +11,7 @@ import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -51,6 +53,10 @@ public class DefaultRunContextManager implements RunContextManager {
String id = contextDef.getId();
BaseRunContext parentContext = context.getParentContext();
if (context instanceof PipelineRunContext pipelineRunContext) {
if (contextMap.containsKey(id)) {
PipelineRunContext oldPipeline = contextMap.get(id);
oldPipeline.changeContextStateAndChild(ContextStateEnum.BAD_ENDING);
}
contextMap.put(id,pipelineRunContext);
} else {
if (parentContext==null) {
@ -84,4 +90,12 @@ public class DefaultRunContextManager implements RunContextManager {
}
return null;
}
public void changePipelineState(String pipelineId,ContextStateEnum stateEnum){
PipelineRunContext pipelineRunContext = contextMap.get(pipelineId);
if (pipelineRunContext==null) {
return;
}
pipelineRunContext.changeContextStateAndChild(stateEnum);
}
}

View File

@ -8,8 +8,10 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.annotation.Transient;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@ -72,10 +74,22 @@ public abstract class BaseRunContext {
if (ContextStateEnum.canGoto(curr,stateEnum)) {
state.compareAndExchange(curr.getCode(),stateEnum.getCode());
// 如果之前有暂停监听状态的.则停止暂停
unpause();
// unpause();
callParentChange(stateEnum);
}
}
public void changeContextStateAndChild(ContextStateEnum stateEnum){
ContextStateEnum curr = ContextStateEnum.getByCode(state.get());
if (ContextStateEnum.canGoto(curr,stateEnum)) {
state.compareAndExchange(curr.getCode(),stateEnum.getCode());
Collection<BaseRunContext> values = this.getChildContext().values();
if (!CollectionUtils.isEmpty(values)) {
for (BaseRunContext value : values) {
value.changeContextStateAndChild(stateEnum);
}
}
}
}
// 保证一直都操作同一个引用的值
private void setState(AtomicInteger state) {
this.state = state;
@ -92,6 +106,7 @@ public abstract class BaseRunContext {
}
if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) {
this.endTime=LocalDateTime.now();
unpause();
parentContext.checkChildEnd();
} else if(ContextStateEnum.READY.equals(state)){
parentContext.checkChildReady();