Merge remote-tracking branch 'origin/master'
This commit is contained in:
commit
e25c9fce4b
@ -0,0 +1,19 @@
|
|||||||
|
package cd.casic.ci.api;
|
||||||
|
|
||||||
|
import cd.casic.ci.process.util.CryptogramUtil;
|
||||||
|
import cd.casic.ci.process.util.SftpUploadUtil;
|
||||||
|
import org.springframework.web.bind.annotation.PostMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
|
@RestController("/test")
|
||||||
|
public class TestController {
|
||||||
|
@PostMapping("/upload")
|
||||||
|
public void uploadTest(){
|
||||||
|
|
||||||
|
try {
|
||||||
|
SftpUploadUtil.uploadFileViaSftp("175.6.27.228",22,"hnidc", CryptogramUtil.doDecrypt("cb2ee50ff663312808773f1698b801d2f9d6073f9684473e090767edbc2dba93"),null,"/ops/ops-pro/ops-server.jar","/home/casic/706/ai_test_527","ops-server.jar");
|
||||||
|
} catch (SftpUploadUtil.SftpUploadException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,14 @@
|
|||||||
|
package cd.casic.ci.process.engine.constant;
|
||||||
|
/**
|
||||||
|
* 用于存放系统变量
|
||||||
|
* */
|
||||||
|
public class PipelineBehaviorConstant {
|
||||||
|
/**
|
||||||
|
* 发生错误是否跳过,存放于taskProperties
|
||||||
|
* */
|
||||||
|
public static final String TASK_SKIP_KEY = "taskSkip";
|
||||||
|
/**
|
||||||
|
* 流水线是否从上次错误处执行
|
||||||
|
* */
|
||||||
|
public static final String PIPELINE_EXECUTE_FROM_ERROR="";
|
||||||
|
}
|
@ -53,16 +53,12 @@ public class ParallelDispatcher implements BaseDispatcher{
|
|||||||
}
|
}
|
||||||
// 等待当前阶段执行
|
// 等待当前阶段执行
|
||||||
latch.await();
|
latch.await();
|
||||||
|
// TODO 检查是否全部执行成功 ,目前没有逻辑就是忽略错误
|
||||||
|
// 当前执行失败
|
||||||
if (pipelineRunContext.getState().get()== ContextStateEnum.BAD_ENDING.getCode()) {
|
if (pipelineRunContext.getState().get()== ContextStateEnum.BAD_ENDING.getCode()) {
|
||||||
log.error("并行执行停止");
|
log.error("并行执行停止");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
// TODO 检查是否全部执行成功 ,目前没有逻辑就是忽略错误
|
|
||||||
// 当前执行失败
|
|
||||||
// while (pipelineRunContext.getState().get() != ContextStateEnum.RUNNING.getCode()) {
|
|
||||||
// // 想办法借助工具类 或者直接wait
|
|
||||||
// pipelineRunContext.pause();
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
|
@ -53,6 +53,7 @@ public class SerialDispatcher implements BaseDispatcher {
|
|||||||
// 如果不为正常执行成功就暂时阻塞直到有状态更改
|
// 如果不为正常执行成功就暂时阻塞直到有状态更改
|
||||||
while (state.get() != ContextStateEnum.HAPPY_ENDING.getCode()
|
while (state.get() != ContextStateEnum.HAPPY_ENDING.getCode()
|
||||||
&& state.get() != ContextStateEnum.BAD_ENDING.getCode()
|
&& state.get() != ContextStateEnum.BAD_ENDING.getCode()
|
||||||
|
&& state.get() != ContextStateEnum.SKIP_TO.getCode()
|
||||||
) {
|
) {
|
||||||
// Thread.sleep(1000L);
|
// Thread.sleep(1000L);
|
||||||
taskRunContext.pause();
|
taskRunContext.pause();
|
||||||
|
@ -12,7 +12,8 @@ public enum ContextStateEnum {
|
|||||||
SUSPEND(3,"挂起"),
|
SUSPEND(3,"挂起"),
|
||||||
STOP(-1,"停止"),
|
STOP(-1,"停止"),
|
||||||
HAPPY_ENDING(4,"执行成功"),
|
HAPPY_ENDING(4,"执行成功"),
|
||||||
BAD_ENDING(5,"执行失败")
|
BAD_ENDING(5,"执行失败"),
|
||||||
|
SKIP_TO(6,"跳过")
|
||||||
;
|
;
|
||||||
|
|
||||||
private Integer code;
|
private Integer code;
|
||||||
@ -22,9 +23,10 @@ public enum ContextStateEnum {
|
|||||||
static {
|
static {
|
||||||
TRANSITIONS.put(INIT, Set.of(READY, SUSPEND, BAD_ENDING, STOP));
|
TRANSITIONS.put(INIT, Set.of(READY, SUSPEND, BAD_ENDING, STOP));
|
||||||
TRANSITIONS.put(READY, Set.of(READY,RUNNING, SUSPEND, BAD_ENDING, STOP));
|
TRANSITIONS.put(READY, Set.of(READY,RUNNING, SUSPEND, BAD_ENDING, STOP));
|
||||||
TRANSITIONS.put(RUNNING, Set.of(RUNNING,SUSPEND, HAPPY_ENDING, BAD_ENDING, STOP));
|
TRANSITIONS.put(RUNNING, Set.of(RUNNING,SUSPEND, HAPPY_ENDING, BAD_ENDING, STOP,SKIP_TO));
|
||||||
TRANSITIONS.put(SUSPEND, Set.of(SUSPEND,INIT, READY, BAD_ENDING, RUNNING,STOP));
|
TRANSITIONS.put(SUSPEND, Set.of(SUSPEND,INIT, READY, BAD_ENDING, RUNNING,STOP));
|
||||||
//...初始化其他状态转移关系
|
//...初始化其他状态转移关系
|
||||||
|
TRANSITIONS.put(SKIP_TO,Collections.emptySet());
|
||||||
}
|
}
|
||||||
|
|
||||||
ContextStateEnum(Integer code, String msg) {
|
ContextStateEnum(Integer code, String msg) {
|
||||||
|
@ -12,6 +12,7 @@ import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
|||||||
import cd.casic.ci.process.process.dataObject.task.PipTask;
|
import cd.casic.ci.process.process.dataObject.task.PipTask;
|
||||||
import cd.casic.framework.commons.exception.ServiceException;
|
import cd.casic.framework.commons.exception.ServiceException;
|
||||||
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
|
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import jakarta.annotation.Resource;
|
import jakarta.annotation.Resource;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
@ -65,7 +66,8 @@ public class DefaultRunContextManager implements RunContextManager {
|
|||||||
if (contextMap.containsKey(id)) {
|
if (contextMap.containsKey(id)) {
|
||||||
PipelineRunContext oldPipeline = contextMap.get(id);
|
PipelineRunContext oldPipeline = contextMap.get(id);
|
||||||
oldPipeline.changeContextStateAndChild(ContextStateEnum.BAD_ENDING);
|
oldPipeline.changeContextStateAndChild(ContextStateEnum.BAD_ENDING);
|
||||||
List<PipTask> taskList = taskDao.selectList("pipelineId", id);
|
LambdaQueryWrapper<PipTask> wrapper = new LambdaQueryWrapper<>();
|
||||||
|
List<PipTask> taskList = taskDao.selectList(wrapper);
|
||||||
List<String> taskIdList = taskList.stream().map(PipTask::getId).toList();
|
List<String> taskIdList = taskList.stream().map(PipTask::getId).toList();
|
||||||
// 清空上一次的日志
|
// 清空上一次的日志
|
||||||
loggerManager.flushMemory(taskIdList);
|
loggerManager.flushMemory(taskIdList);
|
||||||
|
@ -30,7 +30,7 @@ public abstract class BaseRunContext {
|
|||||||
/**
|
/**
|
||||||
* 运行状态
|
* 运行状态
|
||||||
* */
|
* */
|
||||||
private AtomicInteger state;
|
private final AtomicInteger state;
|
||||||
/**
|
/**
|
||||||
* 启动时间
|
* 启动时间
|
||||||
* */
|
* */
|
||||||
@ -73,9 +73,17 @@ public abstract class BaseRunContext {
|
|||||||
ContextStateEnum curr = ContextStateEnum.getByCode(state.get());
|
ContextStateEnum curr = ContextStateEnum.getByCode(state.get());
|
||||||
if (ContextStateEnum.canGoto(curr,stateEnum)) {
|
if (ContextStateEnum.canGoto(curr,stateEnum)) {
|
||||||
state.compareAndExchange(curr.getCode(),stateEnum.getCode());
|
state.compareAndExchange(curr.getCode(),stateEnum.getCode());
|
||||||
// 如果之前有暂停监听状态的.则停止暂停
|
if (ContextStateEnum.HAPPY_ENDING.equals(stateEnum)
|
||||||
// unpause();
|
||ContextStateEnum.BAD_ENDING.equals(stateEnum)
|
||||||
|
||ContextStateEnum.SKIP_TO.equals(stateEnum)) {
|
||||||
|
this.endTime=LocalDateTime.now();
|
||||||
|
}
|
||||||
|
if(this instanceof PipelineRunContext){
|
||||||
|
log.info("debug专用");
|
||||||
|
}
|
||||||
callParentChange(stateEnum);
|
callParentChange(stateEnum);
|
||||||
|
} else {
|
||||||
|
log.error("非法状态扭转直接忽略");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
public void changeContextStateAndChild(ContextStateEnum stateEnum){
|
public void changeContextStateAndChild(ContextStateEnum stateEnum){
|
||||||
@ -90,10 +98,6 @@ public abstract class BaseRunContext {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// 保证一直都操作同一个引用的值
|
|
||||||
private void setState(AtomicInteger state) {
|
|
||||||
this.state = state;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取当前或者子上下文
|
* 获取当前或者子上下文
|
||||||
@ -104,10 +108,11 @@ public abstract class BaseRunContext {
|
|||||||
if (parentContext==null) {
|
if (parentContext==null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) {
|
if (ContextStateEnum.HAPPY_ENDING.equals(state)
|
||||||
this.endTime=LocalDateTime.now();
|
||ContextStateEnum.BAD_ENDING.equals(state)
|
||||||
unpause();
|
||ContextStateEnum.SKIP_TO.equals(state)) {
|
||||||
parentContext.checkChildEnd();
|
parentContext.checkChildEnd();
|
||||||
|
unpause();
|
||||||
} else if(ContextStateEnum.READY.equals(state)){
|
} else if(ContextStateEnum.READY.equals(state)){
|
||||||
parentContext.checkChildReady();
|
parentContext.checkChildReady();
|
||||||
} else if(ContextStateEnum.RUNNING.equals(state)){
|
} else if(ContextStateEnum.RUNNING.equals(state)){
|
||||||
@ -127,10 +132,15 @@ public abstract class BaseRunContext {
|
|||||||
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
|
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
|
||||||
BaseRunContext child = entry.getValue();
|
BaseRunContext child = entry.getValue();
|
||||||
int state = child.getState().get();
|
int state = child.getState().get();
|
||||||
if (!ContextStateEnum.HAPPY_ENDING.getCode().equals(state)&&!ContextStateEnum.BAD_ENDING.getCode().equals(state)) {
|
if (!ContextStateEnum.HAPPY_ENDING.getCode().equals(state)
|
||||||
|
&&!ContextStateEnum.BAD_ENDING.getCode().equals(state)
|
||||||
|
&&!ContextStateEnum.SKIP_TO.getCode().equals(state)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
result&=state;
|
// 子成员中有一个失败则状态为失败
|
||||||
|
if (ContextStateEnum.BAD_ENDING.getCode().equals(state)) {
|
||||||
|
result=state;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
boolean end = false;
|
boolean end = false;
|
||||||
if (ContextStateEnum.HAPPY_ENDING.getCode()==result) {
|
if (ContextStateEnum.HAPPY_ENDING.getCode()==result) {
|
||||||
@ -208,4 +218,22 @@ public abstract class BaseRunContext {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "BaseRunContext{" +
|
||||||
|
"contextDef=" + contextDef +
|
||||||
|
", childCount=" + childCount +
|
||||||
|
", state=" + state +
|
||||||
|
", startTime=" + startTime +
|
||||||
|
", endTime=" + endTime +
|
||||||
|
", resourceId='" + resourceId + '\'' +
|
||||||
|
", targetVersionId='" + targetVersionId + '\'' +
|
||||||
|
", targetType='" + targetType + '\'' +
|
||||||
|
", globalVariables=" + globalVariables +
|
||||||
|
", localVariables=" + localVariables +
|
||||||
|
", childContext=" + childContext +
|
||||||
|
", countDownLatch=" + countDownLatch +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -6,6 +6,7 @@ import cd.casic.ci.process.engine.runContext.TaskRunContext;
|
|||||||
import cd.casic.ci.process.engine.worker.base.BaseWorker;
|
import cd.casic.ci.process.engine.worker.base.BaseWorker;
|
||||||
import cd.casic.ci.process.process.service.machine.MachineInfoService;
|
import cd.casic.ci.process.process.service.machine.MachineInfoService;
|
||||||
import cd.casic.ci.process.process.service.target.TargetVersionService;
|
import cd.casic.ci.process.process.service.target.TargetVersionService;
|
||||||
|
import cd.casic.ci.process.util.SftpUploadUtil;
|
||||||
import jakarta.annotation.Resource;
|
import jakarta.annotation.Resource;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@ -0,0 +1,31 @@
|
|||||||
|
package cd.casic.ci.process.engine.worker;
|
||||||
|
|
||||||
|
|
||||||
|
import cd.casic.ci.process.common.WorkAtom;
|
||||||
|
import cd.casic.ci.process.engine.runContext.TaskRunContext;
|
||||||
|
import cd.casic.ci.process.engine.worker.base.BaseWorker;
|
||||||
|
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
||||||
|
import cd.casic.ci.process.process.dataObject.task.PipTask;
|
||||||
|
import cd.casic.framework.commons.exception.ServiceException;
|
||||||
|
import com.alibaba.fastjson.JSON;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@WorkAtom(taskType = "ERROR_SUCCESS")
|
||||||
|
public class TestErrorWorker extends BaseWorker {
|
||||||
|
@Override
|
||||||
|
public void execute(TaskRunContext context) {
|
||||||
|
if (context.getContextDef() instanceof PipTask task) {
|
||||||
|
Map<String, Object> taskProperties = task.getTaskProperties();
|
||||||
|
String s = String.valueOf(taskProperties.get("buildScript"));
|
||||||
|
if ("error".equals(s)) {
|
||||||
|
append(context,"执行报错");
|
||||||
|
throw new ServiceException(111,"模拟测试报错");
|
||||||
|
}
|
||||||
|
append(context,"执行成功");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -3,6 +3,7 @@ package cd.casic.ci.process.engine.worker.base;
|
|||||||
|
|
||||||
import cd.casic.ci.process.constant.CommandConstant;
|
import cd.casic.ci.process.constant.CommandConstant;
|
||||||
import cd.casic.ci.process.engine.constant.EngineRuntimeConstant;
|
import cd.casic.ci.process.engine.constant.EngineRuntimeConstant;
|
||||||
|
import cd.casic.ci.process.engine.constant.PipelineBehaviorConstant;
|
||||||
import cd.casic.ci.process.engine.enums.ContextStateEnum;
|
import cd.casic.ci.process.engine.enums.ContextStateEnum;
|
||||||
import cd.casic.ci.process.engine.manager.LoggerManager;
|
import cd.casic.ci.process.engine.manager.LoggerManager;
|
||||||
import cd.casic.ci.process.engine.manager.RunContextManager;
|
import cd.casic.ci.process.engine.manager.RunContextManager;
|
||||||
@ -13,6 +14,7 @@ import cd.casic.ci.process.enums.MachineSystemEnum;
|
|||||||
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
||||||
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
|
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
|
||||||
import cd.casic.ci.process.process.dataObject.machine.MachineInfo;
|
import cd.casic.ci.process.process.dataObject.machine.MachineInfo;
|
||||||
|
import cd.casic.ci.process.process.dataObject.task.PipTask;
|
||||||
import cd.casic.ci.process.process.service.machine.MachineInfoService;
|
import cd.casic.ci.process.process.service.machine.MachineInfoService;
|
||||||
|
|
||||||
import cd.casic.ci.process.ssh.SshClient;
|
import cd.casic.ci.process.ssh.SshClient;
|
||||||
@ -29,6 +31,7 @@ import org.apache.commons.lang3.StringUtils;
|
|||||||
import javax.swing.text.StringContent;
|
import javax.swing.text.StringContent;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
@ -67,8 +70,20 @@ public abstract class BaseWorker implements Runnable{
|
|||||||
execute(taskRunContext);
|
execute(taskRunContext);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("================worker执行报错:",e);
|
log.error("================worker执行报错:",e);
|
||||||
taskRunContext.changeContextState(ContextStateEnum.BAD_ENDING);
|
// todo 根据配置决定失败是跳过还是直接失败,如果直接跳过,状态改为跳过,如果直接失败状态就改为失败
|
||||||
append(context,e.getMessage());
|
PipBaseElement contextDef = taskRunContext.getContextDef();
|
||||||
|
if (contextDef instanceof PipTask task) {
|
||||||
|
Map<String, Object> taskProperties = task.getTaskProperties();
|
||||||
|
Object taskSkip = taskProperties.get(PipelineBehaviorConstant.TASK_SKIP_KEY);
|
||||||
|
if (Boolean.TRUE.equals(taskSkip)) {
|
||||||
|
taskRunContext.changeContextState(ContextStateEnum.SKIP_TO);
|
||||||
|
append(context,e.getMessage());
|
||||||
|
} else{
|
||||||
|
taskRunContext.changeContextState(ContextStateEnum.BAD_ENDING);
|
||||||
|
append(context,e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// TODO 执行结束修改context的state,并且通知父类
|
// TODO 执行结束修改context的state,并且通知父类
|
||||||
|
@ -24,4 +24,14 @@ public class PipelineExecuteTest {
|
|||||||
public void getRunState(){
|
public void getRunState(){
|
||||||
pipelineService.getPipelineRunState("716299522803896320");
|
pipelineService.getPipelineRunState("716299522803896320");
|
||||||
}
|
}
|
||||||
|
@Test
|
||||||
|
public void taskSkipExecute(){
|
||||||
|
// 这个流水线包含了故意报错的worker,可以验证跳过效果
|
||||||
|
pipelineExecutor.execute("718104543308681216");
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
public void taskSkipGetState(){
|
||||||
|
// 这个流水线包含了故意报错的worker,可以验证跳过效果
|
||||||
|
pipelineService.getPipelineRunState("718104543308681216");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user