流水线task发生错误是否跳过(初步测试完毕)

This commit is contained in:
even 2025-06-04 16:02:14 +08:00
parent ac2ed85474
commit 6961b4b07c
7 changed files with 112 additions and 10 deletions

View File

@ -0,0 +1,14 @@
package cd.casic.ci.process.engine.constant;
/**
* 用于存放系统变量
* */
public class PipelineBehaviorConstant {
/**
* 发生错误是否跳过存放于taskProperties
* */
public static final String TASK_SKIP_KEY = "taskSkip";
/**
* 流水线是否从上次错误处执行
* */
public static final String PIPELINE_EXECUTE_FROM_ERROR="";
}

View File

@ -53,6 +53,7 @@ public class SerialDispatcher implements BaseDispatcher {
// 如果不为正常执行成功就暂时阻塞直到有状态更改
while (state.get() != ContextStateEnum.HAPPY_ENDING.getCode()
&& state.get() != ContextStateEnum.BAD_ENDING.getCode()
&& state.get() != ContextStateEnum.SKIP_TO.getCode()
) {
// Thread.sleep(1000L);
taskRunContext.pause();

View File

@ -23,7 +23,7 @@ public enum ContextStateEnum {
static {
TRANSITIONS.put(INIT, Set.of(READY, SUSPEND, BAD_ENDING, STOP));
TRANSITIONS.put(READY, Set.of(READY,RUNNING, SUSPEND, BAD_ENDING, STOP));
TRANSITIONS.put(RUNNING, Set.of(RUNNING,SUSPEND, HAPPY_ENDING, BAD_ENDING, STOP));
TRANSITIONS.put(RUNNING, Set.of(RUNNING,SUSPEND, HAPPY_ENDING, BAD_ENDING, STOP,SKIP_TO));
TRANSITIONS.put(SUSPEND, Set.of(SUSPEND,INIT, READY, BAD_ENDING, RUNNING,STOP));
//...初始化其他状态转移关系
TRANSITIONS.put(SKIP_TO,Collections.emptySet());

View File

@ -73,9 +73,17 @@ public abstract class BaseRunContext {
ContextStateEnum curr = ContextStateEnum.getByCode(state.get());
if (ContextStateEnum.canGoto(curr,stateEnum)) {
state.compareAndExchange(curr.getCode(),stateEnum.getCode());
// 如果之前有暂停监听状态的.则停止暂停
// unpause();
if (ContextStateEnum.HAPPY_ENDING.equals(stateEnum)
||ContextStateEnum.BAD_ENDING.equals(stateEnum)
||ContextStateEnum.SKIP_TO.equals(stateEnum)) {
this.endTime=LocalDateTime.now();
}
if(this instanceof PipelineRunContext){
log.info("debug专用");
}
callParentChange(stateEnum);
} else {
log.error("非法状态扭转直接忽略");
}
}
public void changeContextStateAndChild(ContextStateEnum stateEnum){
@ -100,10 +108,11 @@ public abstract class BaseRunContext {
if (parentContext==null) {
return;
}
if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) {
this.endTime=LocalDateTime.now();
unpause();
if (ContextStateEnum.HAPPY_ENDING.equals(state)
||ContextStateEnum.BAD_ENDING.equals(state)
||ContextStateEnum.SKIP_TO.equals(state)) {
parentContext.checkChildEnd();
unpause();
} else if(ContextStateEnum.READY.equals(state)){
parentContext.checkChildReady();
} else if(ContextStateEnum.RUNNING.equals(state)){
@ -123,10 +132,15 @@ public abstract class BaseRunContext {
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
BaseRunContext child = entry.getValue();
int state = child.getState().get();
if (!ContextStateEnum.HAPPY_ENDING.getCode().equals(state)&&!ContextStateEnum.BAD_ENDING.getCode().equals(state)) {
if (!ContextStateEnum.HAPPY_ENDING.getCode().equals(state)
&&!ContextStateEnum.BAD_ENDING.getCode().equals(state)
&&!ContextStateEnum.SKIP_TO.getCode().equals(state)) {
return;
}
result&=state;
// 子成员中有一个失败则状态为失败
if (ContextStateEnum.BAD_ENDING.getCode().equals(state)) {
result=state;
}
}
boolean end = false;
if (ContextStateEnum.HAPPY_ENDING.getCode()==result) {
@ -204,4 +218,22 @@ public abstract class BaseRunContext {
}
}
}
@Override
public String toString() {
return "BaseRunContext{" +
"contextDef=" + contextDef +
", childCount=" + childCount +
", state=" + state +
", startTime=" + startTime +
", endTime=" + endTime +
", resourceId='" + resourceId + '\'' +
", targetVersionId='" + targetVersionId + '\'' +
", targetType='" + targetType + '\'' +
", globalVariables=" + globalVariables +
", localVariables=" + localVariables +
", childContext=" + childContext +
", countDownLatch=" + countDownLatch +
'}';
}
}

View File

@ -0,0 +1,31 @@
package cd.casic.ci.process.engine.worker;
import cd.casic.ci.process.common.WorkAtom;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.engine.worker.base.BaseWorker;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.framework.commons.exception.ServiceException;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
@Slf4j
@WorkAtom(taskType = "ERROR_SUCCESS")
public class TestErrorWorker extends BaseWorker {
@Override
public void execute(TaskRunContext context) {
if (context.getContextDef() instanceof PipTask task) {
Map<String, Object> taskProperties = task.getTaskProperties();
String s = String.valueOf(taskProperties.get("buildScript"));
if ("error".equals(s)) {
append(context,"执行报错");
throw new ServiceException(111,"模拟测试报错");
}
append(context,"执行成功");
}
}
}

View File

@ -3,6 +3,7 @@ package cd.casic.ci.process.engine.worker.base;
import cd.casic.ci.process.constant.CommandConstant;
import cd.casic.ci.process.engine.constant.EngineRuntimeConstant;
import cd.casic.ci.process.engine.constant.PipelineBehaviorConstant;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.engine.manager.LoggerManager;
import cd.casic.ci.process.engine.manager.RunContextManager;
@ -13,6 +14,7 @@ import cd.casic.ci.process.enums.MachineSystemEnum;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.ci.process.process.dataObject.machine.MachineInfo;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.ci.process.process.service.machine.MachineInfoService;
import cd.casic.ci.process.ssh.SshClient;
@ -29,6 +31,7 @@ import org.apache.commons.lang3.StringUtils;
import javax.swing.text.StringContent;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@Data
@ -68,8 +71,19 @@ public abstract class BaseWorker implements Runnable{
} catch (Exception e) {
log.error("================worker执行报错",e);
// todo 根据配置决定失败是跳过还是直接失败如果直接跳过状态改为跳过如果直接失败状态就改为失败
taskRunContext.changeContextState(ContextStateEnum.BAD_ENDING);
append(context,e.getMessage());
PipBaseElement contextDef = taskRunContext.getContextDef();
if (contextDef instanceof PipTask task) {
Map<String, Object> taskProperties = task.getTaskProperties();
Object taskSkip = taskProperties.get(PipelineBehaviorConstant.TASK_SKIP_KEY);
if (Boolean.TRUE.equals(taskSkip)) {
taskRunContext.changeContextState(ContextStateEnum.SKIP_TO);
append(context,e.getMessage());
} else{
taskRunContext.changeContextState(ContextStateEnum.BAD_ENDING);
append(context,e.getMessage());
}
}
return;
}
// TODO 执行结束修改context的state,并且通知父类

View File

@ -24,4 +24,14 @@ public class PipelineExecuteTest {
public void getRunState(){
pipelineService.getPipelineRunState("716299522803896320");
}
@Test
public void taskSkipExecute(){
// 这个流水线包含了故意报错的worker可以验证跳过效果
pipelineExecutor.execute("718104543308681216");
}
@Test
public void taskSkipGetState(){
// 这个流水线包含了故意报错的worker可以验证跳过效果
pipelineService.getPipelineRunState("718104543308681216");
}
}