执行流程调试暂时完毕
This commit is contained in:
parent
1ac1efed22
commit
b050ca2acb
2
.idea/compiler.xml
generated
2
.idea/compiler.xml
generated
@ -14,9 +14,9 @@
|
|||||||
<outputRelativeToContentRoot value="true" />
|
<outputRelativeToContentRoot value="true" />
|
||||||
<processorPath useClasspath="false">
|
<processorPath useClasspath="false">
|
||||||
<entry name="$PROJECT_DIR$/../../apache-maven-3.8.6-bin/repository/org/springframework/boot/spring-boot-configuration-processor/3.3.4/spring-boot-configuration-processor-3.3.4.jar" />
|
<entry name="$PROJECT_DIR$/../../apache-maven-3.8.6-bin/repository/org/springframework/boot/spring-boot-configuration-processor/3.3.4/spring-boot-configuration-processor-3.3.4.jar" />
|
||||||
<entry name="$PROJECT_DIR$/../../apache-maven-3.8.6-bin/repository/org/projectlombok/lombok/1.18.34/lombok-1.18.34.jar" />
|
|
||||||
<entry name="$PROJECT_DIR$/../../apache-maven-3.8.6-bin/repository/org/mapstruct/mapstruct-processor/1.6.2/mapstruct-processor-1.6.2.jar" />
|
<entry name="$PROJECT_DIR$/../../apache-maven-3.8.6-bin/repository/org/mapstruct/mapstruct-processor/1.6.2/mapstruct-processor-1.6.2.jar" />
|
||||||
<entry name="$PROJECT_DIR$/../../apache-maven-3.8.6-bin/repository/org/mapstruct/mapstruct/1.6.2/mapstruct-1.6.2.jar" />
|
<entry name="$PROJECT_DIR$/../../apache-maven-3.8.6-bin/repository/org/mapstruct/mapstruct/1.6.2/mapstruct-1.6.2.jar" />
|
||||||
|
<entry name="$PROJECT_DIR$/../../apache-maven-3.8.6-bin/repository/org/projectlombok/lombok/1.18.34/lombok-1.18.34.jar" />
|
||||||
</processorPath>
|
</processorPath>
|
||||||
<module name="spring-boot-starter-protection" />
|
<module name="spring-boot-starter-protection" />
|
||||||
<module name="module-ci-environment" />
|
<module name="module-ci-environment" />
|
||||||
|
@ -52,7 +52,7 @@ public abstract class BaseDO implements Serializable, TransPojo {
|
|||||||
/**
|
/**
|
||||||
* 是否删除
|
* 是否删除
|
||||||
*/
|
*/
|
||||||
@TableLogic
|
// @TableLogic
|
||||||
private Boolean deleted;
|
private Boolean deleted;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -51,7 +51,7 @@ public class ParallelDispatcher implements BaseDispatcher{
|
|||||||
CountDownLatch latch = new CountDownLatch(stageList.size());
|
CountDownLatch latch = new CountDownLatch(stageList.size());
|
||||||
for (PipStage secondStage : stageList) {
|
for (PipStage secondStage : stageList) {
|
||||||
// 二阶段下所有task是串行所以不用关心线程安全相关信息
|
// 二阶段下所有task是串行所以不用关心线程安全相关信息
|
||||||
SecondStageRunContext context = new SecondStageRunContext(secondStage,pipelineRunContext,new ConcurrentHashMap<>());
|
SecondStageRunContext context = new SecondStageRunContext(secondStage,secondStage.getTaskValues().size(),pipelineRunContext,new ConcurrentHashMap<>());
|
||||||
runContextManager.contextRegister(context);
|
runContextManager.contextRegister(context);
|
||||||
SerialDispatcher serialDispatcher = new SerialDispatcher(context,latch,runContextManager,redisMQTemplate);
|
SerialDispatcher serialDispatcher = new SerialDispatcher(context,latch,runContextManager,redisMQTemplate);
|
||||||
// 给线程池进行执行
|
// 给线程池进行执行
|
||||||
|
@ -2,65 +2,46 @@ package cd.casic.ci.process.engine.enums;
|
|||||||
|
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
|
|
||||||
import java.util.HashSet;
|
import java.util.*;
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Set;
|
|
||||||
@Getter
|
@Getter
|
||||||
public enum ContextStateEnum {
|
public enum ContextStateEnum {
|
||||||
INIT(0,"初始化", new HashSet<>(){
|
INIT(0,"初始化"),
|
||||||
{
|
READY(1,"就绪"),
|
||||||
add(READY);
|
RUNNING(2,"运行"),
|
||||||
add(SUSPEND);
|
SUSPEND(3,"挂起"),
|
||||||
add(BAD_ENDING);
|
STOP(-1,"停止"),
|
||||||
add(STOP);
|
HAPPY_ENDING(4,"执行成功"),
|
||||||
}
|
BAD_ENDING(5,"执行失败")
|
||||||
}),
|
|
||||||
READY(1,"就绪", new HashSet<>(){
|
|
||||||
{
|
|
||||||
add(RUNNING);
|
|
||||||
add(SUSPEND);
|
|
||||||
add(BAD_ENDING);
|
|
||||||
add(STOP);
|
|
||||||
}
|
|
||||||
}),
|
|
||||||
RUNNING(2,"运行", new HashSet<>(){
|
|
||||||
{
|
|
||||||
add(SUSPEND);
|
|
||||||
add(STOP);
|
|
||||||
add(HAPPY_ENDING);
|
|
||||||
add(BAD_ENDING);
|
|
||||||
}
|
|
||||||
}),
|
|
||||||
SUSPEND(3,"挂起", new HashSet<>(){
|
|
||||||
{
|
|
||||||
add(INIT);
|
|
||||||
add(READY);
|
|
||||||
add(RUNNING);
|
|
||||||
add(BAD_ENDING);
|
|
||||||
}
|
|
||||||
}),
|
|
||||||
STOP(-1,"停止", new HashSet<>()),
|
|
||||||
HAPPY_ENDING(4,"执行成功", new HashSet<>()),
|
|
||||||
BAD_ENDING(5,"执行失败", new HashSet<>())
|
|
||||||
;
|
;
|
||||||
|
|
||||||
private Integer code;
|
private Integer code;
|
||||||
private String msg;
|
private String msg;
|
||||||
/**
|
private static final Map<ContextStateEnum, Set<ContextStateEnum>> TRANSITIONS = new EnumMap<>(ContextStateEnum.class);
|
||||||
* 包含当前所有合法的下一个状态
|
|
||||||
* */
|
|
||||||
private Set<ContextStateEnum> nextStep;
|
|
||||||
|
|
||||||
ContextStateEnum(Integer code, String msg, Set<ContextStateEnum> nextStep) {
|
static {
|
||||||
|
TRANSITIONS.put(INIT, Set.of(READY, SUSPEND, BAD_ENDING, STOP));
|
||||||
|
TRANSITIONS.put(READY, Set.of(READY,RUNNING, SUSPEND, BAD_ENDING, STOP));
|
||||||
|
TRANSITIONS.put(RUNNING, Set.of(RUNNING,SUSPEND, HAPPY_ENDING, BAD_ENDING, STOP));
|
||||||
|
TRANSITIONS.put(SUSPEND, Set.of(SUSPEND,INIT, READY, BAD_ENDING, RUNNING,STOP));
|
||||||
|
//...初始化其他状态转移关系
|
||||||
|
}
|
||||||
|
|
||||||
|
ContextStateEnum(Integer code, String msg) {
|
||||||
this.code = code;
|
this.code = code;
|
||||||
this.msg = msg;
|
this.msg = msg;
|
||||||
this.nextStep = nextStep;
|
|
||||||
}
|
}
|
||||||
public static Boolean canGoto(ContextStateEnum from,ContextStateEnum to){
|
public static Boolean canGoto(ContextStateEnum from,ContextStateEnum to){
|
||||||
|
try {
|
||||||
if (Objects.isNull(from) || Objects.isNull(to)) {
|
if (Objects.isNull(from) || Objects.isNull(to)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return from.nextStep.contains(to);
|
return TRANSITIONS.get(from).contains(to);
|
||||||
|
} catch (Exception e){
|
||||||
|
System.out.println("");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
public static ContextStateEnum getByCode(Integer code){
|
public static ContextStateEnum getByCode(Integer code){
|
||||||
for (ContextStateEnum value : values()) {
|
for (ContextStateEnum value : values()) {
|
||||||
|
@ -52,8 +52,12 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
|
|||||||
if (CollectionUtils.isEmpty(mainStage)) {
|
if (CollectionUtils.isEmpty(mainStage)) {
|
||||||
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到有效阶段信息");
|
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到有效阶段信息");
|
||||||
}
|
}
|
||||||
|
Integer childCount= 0;
|
||||||
|
for (PipStage stage : mainStage) {
|
||||||
|
childCount+=stage.getStageList().size();
|
||||||
|
}
|
||||||
// 如果要做 容灾就需要重新将数据库存的记录按顺序加载入
|
// 如果要做 容灾就需要重新将数据库存的记录按顺序加载入
|
||||||
PipelineRunContext pipelineRunContext = new PipelineRunContext(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>());
|
PipelineRunContext pipelineRunContext = new PipelineRunContext(pipeline,childCount,null,new ConcurrentHashMap<>(),new ConcurrentHashMap<>());
|
||||||
ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager,redisMQTemplate,serialExecutor);
|
ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager,redisMQTemplate,serialExecutor);
|
||||||
parallelExecutor.execute(parallelDispatcher);
|
parallelExecutor.execute(parallelDispatcher);
|
||||||
return pipelineRunContext;
|
return pipelineRunContext;
|
||||||
|
@ -76,6 +76,7 @@ public class DefaultWorkerManager extends AbstractRedisStreamMessageListener<Tas
|
|||||||
workerExecutor.execute(baseWorker);
|
workerExecutor.execute(baseWorker);
|
||||||
}catch (Exception e){
|
}catch (Exception e){
|
||||||
// TODO 后期可以考虑专门整一个队列
|
// TODO 后期可以考虑专门整一个队列
|
||||||
|
log.error("=======================workerManager:",e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,7 +4,9 @@ import cd.casic.ci.process.engine.enums.ContextStateEnum;
|
|||||||
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
||||||
import cd.casic.framework.commons.exception.ServiceException;
|
import cd.casic.framework.commons.exception.ServiceException;
|
||||||
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
|
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
import org.springframework.data.annotation.Transient;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -16,7 +18,10 @@ public abstract class BaseRunContext {
|
|||||||
* 当前上下文的定义
|
* 当前上下文的定义
|
||||||
* */
|
* */
|
||||||
private PipBaseElement contextDef;
|
private PipBaseElement contextDef;
|
||||||
|
@Transient
|
||||||
|
@JsonIgnore
|
||||||
private BaseRunContext parentContext;
|
private BaseRunContext parentContext;
|
||||||
|
private Integer childCount;
|
||||||
/**
|
/**
|
||||||
* 运行状态
|
* 运行状态
|
||||||
* */
|
* */
|
||||||
@ -42,8 +47,9 @@ public abstract class BaseRunContext {
|
|||||||
private Map<String,Object> localVariables;
|
private Map<String,Object> localVariables;
|
||||||
private Map<String,BaseRunContext> childContext;
|
private Map<String,BaseRunContext> childContext;
|
||||||
|
|
||||||
public BaseRunContext(PipBaseElement contextDef,BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) {
|
public BaseRunContext(PipBaseElement contextDef,Integer childCount,BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) {
|
||||||
this.contextDef = contextDef;
|
this.contextDef = contextDef;
|
||||||
|
this.childCount = childCount;
|
||||||
this.parentContext = parentContext;
|
this.parentContext = parentContext;
|
||||||
this.state = new AtomicInteger(ContextStateEnum.INIT.getCode());
|
this.state = new AtomicInteger(ContextStateEnum.INIT.getCode());
|
||||||
this.startTime = startTime;
|
this.startTime = startTime;
|
||||||
@ -88,6 +94,10 @@ public abstract class BaseRunContext {
|
|||||||
* 查找子类是否全部完成,如果子类全部完成则父类也完成
|
* 查找子类是否全部完成,如果子类全部完成则父类也完成
|
||||||
* */
|
* */
|
||||||
public void checkChildEnd() throws ServiceException{
|
public void checkChildEnd() throws ServiceException{
|
||||||
|
Map<String, BaseRunContext> childContext = getChildContext();
|
||||||
|
if (childContext.size()!=childCount) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
int result = ContextStateEnum.HAPPY_ENDING.getCode();
|
int result = ContextStateEnum.HAPPY_ENDING.getCode();
|
||||||
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
|
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
|
||||||
BaseRunContext child = entry.getValue();
|
BaseRunContext child = entry.getValue();
|
||||||
@ -100,18 +110,16 @@ public abstract class BaseRunContext {
|
|||||||
boolean end = false;
|
boolean end = false;
|
||||||
if (ContextStateEnum.HAPPY_ENDING.getCode()==result) {
|
if (ContextStateEnum.HAPPY_ENDING.getCode()==result) {
|
||||||
if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.HAPPY_ENDING)) {
|
if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.HAPPY_ENDING)) {
|
||||||
this.state.compareAndExchange(state.get(),ContextStateEnum.HAPPY_ENDING.getCode());
|
this.changeContextState(ContextStateEnum.HAPPY_ENDING);
|
||||||
end = true;
|
end = true;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.BAD_ENDING)) {
|
if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.BAD_ENDING)) {
|
||||||
this.state.compareAndExchange(state.get(),ContextStateEnum.BAD_ENDING.getCode());
|
this.changeContextState(ContextStateEnum.BAD_ENDING);
|
||||||
end = true;
|
end = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (end) {
|
// 当前执行结束看看是否要执行后置处理
|
||||||
this.changeContextState(ContextStateEnum.getByCode(result));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* 查找子类是否全部就绪,如果子类全部完成则父类也就绪
|
* 查找子类是否全部就绪,如果子类全部完成则父类也就绪
|
||||||
@ -127,16 +135,11 @@ public abstract class BaseRunContext {
|
|||||||
}
|
}
|
||||||
result&=state;
|
result&=state;
|
||||||
}
|
}
|
||||||
boolean ready = false;
|
if (ContextStateEnum.READY.getCode().equals(result)) {
|
||||||
if (ContextStateEnum.READY.getCode()==result) {
|
|
||||||
if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.READY)) {
|
if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.READY)) {
|
||||||
this.state.compareAndExchange(state.get(),ContextStateEnum.READY.getCode());
|
this.changeContextState(ContextStateEnum.READY);
|
||||||
ready = true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!ready) {
|
|
||||||
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* 查找子类是否存在开始运行的,如果有则父状态变成running
|
* 查找子类是否存在开始运行的,如果有则父状态变成running
|
||||||
@ -147,14 +150,14 @@ public abstract class BaseRunContext {
|
|||||||
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
|
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
|
||||||
BaseRunContext child = entry.getValue();
|
BaseRunContext child = entry.getValue();
|
||||||
int state = child.getState().get();
|
int state = child.getState().get();
|
||||||
if (ContextStateEnum.READY.getCode().equals(state)) {
|
if (ContextStateEnum.RUNNING.getCode().equals(state)) {
|
||||||
runningFlag = true;
|
runningFlag = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (runningFlag) {
|
if (runningFlag) {
|
||||||
if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.RUNNING)) {
|
if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.RUNNING)) {
|
||||||
this.state.compareAndExchange(state.get(),ContextStateEnum.RUNNING.getCode());
|
this.changeContextState(ContextStateEnum.RUNNING);
|
||||||
} else{
|
} else{
|
||||||
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误");
|
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误");
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package cd.casic.ci.process.engine.runContext;
|
package cd.casic.ci.process.engine.runContext;
|
||||||
|
|
||||||
|
import cd.casic.ci.process.engine.enums.ContextStateEnum;
|
||||||
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
||||||
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
|
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
|
||||||
import cd.casic.framework.commons.exception.ServiceException;
|
import cd.casic.framework.commons.exception.ServiceException;
|
||||||
@ -11,16 +12,17 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||||||
|
|
||||||
public class PipelineRunContext extends BaseRunContext{
|
public class PipelineRunContext extends BaseRunContext{
|
||||||
|
|
||||||
public PipelineRunContext(PipPipeline pipeline) {
|
public PipelineRunContext(PipPipeline pipeline,Integer childCount) {
|
||||||
this(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>());
|
this(pipeline,childCount,null,new ConcurrentHashMap<>(),new ConcurrentHashMap<>());
|
||||||
}
|
}
|
||||||
|
|
||||||
public PipelineRunContext(BaseRunContext parentContext, PipPipeline pipeline, Map<String, Object> globalVariables, Map<String, Object> localVariables) {
|
public PipelineRunContext(PipPipeline pipeline,Integer childCount,BaseRunContext parentContext, Map<String, Object> globalVariables, Map<String, Object> localVariables) {
|
||||||
this(pipeline,parentContext,LocalDateTime.now(),pipeline.getResourceId(),pipeline.getTargetId(),pipeline.getTargetType(),globalVariables,localVariables,new ConcurrentHashMap<>());
|
this(pipeline,childCount,parentContext,LocalDateTime.now(),pipeline.getResourceId(),pipeline.getTargetId(),pipeline.getTargetType(),globalVariables,localVariables,new ConcurrentHashMap<>());
|
||||||
}
|
}
|
||||||
|
|
||||||
private PipelineRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) {
|
private PipelineRunContext(PipBaseElement contextDef,Integer childCount, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) {
|
||||||
super( contextDef
|
super( contextDef
|
||||||
|
,childCount
|
||||||
,parentContext
|
,parentContext
|
||||||
,startTime
|
,startTime
|
||||||
, resourceId
|
, resourceId
|
||||||
|
@ -11,8 +11,8 @@ import java.util.Map;
|
|||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
public class SecondStageRunContext extends BaseRunContext{
|
public class SecondStageRunContext extends BaseRunContext{
|
||||||
public SecondStageRunContext(PipStage contextDef, PipelineRunContext parentContext, Map<String, Object> localVariables) {
|
public SecondStageRunContext(PipStage contextDef,Integer childCount, PipelineRunContext parentContext, Map<String, Object> localVariables) {
|
||||||
super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), parentContext.getGlobalVariables(), localVariables, new ConcurrentHashMap<>());
|
super(contextDef,childCount, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), parentContext.getGlobalVariables(), localVariables, new ConcurrentHashMap<>());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -12,10 +12,10 @@ import java.util.Map;
|
|||||||
|
|
||||||
public class TaskRunContext extends BaseRunContext{
|
public class TaskRunContext extends BaseRunContext{
|
||||||
public TaskRunContext(PipTask contextDef, SecondStageRunContext parentContext,Map<String,Object> localVariable) {
|
public TaskRunContext(PipTask contextDef, SecondStageRunContext parentContext,Map<String,Object> localVariable) {
|
||||||
super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), parentContext.getGlobalVariables(),localVariable, new HashMap<>());
|
super(contextDef,0, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), parentContext.getGlobalVariables(),localVariable, new HashMap<>());
|
||||||
}
|
}
|
||||||
private TaskRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) {
|
private TaskRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) {
|
||||||
super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, childContext);
|
super(contextDef,0, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, childContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -17,7 +17,6 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|||||||
|
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
|
|
||||||
public abstract class BaseWorker implements Runnable{
|
public abstract class BaseWorker implements Runnable{
|
||||||
// 一些属性
|
// 一些属性
|
||||||
@Resource
|
@Resource
|
||||||
@ -36,10 +35,12 @@ public abstract class BaseWorker implements Runnable{
|
|||||||
BaseRunContext context = contextManager.getContext(contextKey);
|
BaseRunContext context = contextManager.getContext(contextKey);
|
||||||
if (context instanceof TaskRunContext taskRunContext){
|
if (context instanceof TaskRunContext taskRunContext){
|
||||||
try {
|
try {
|
||||||
|
taskRunContext.changeContextState(ContextStateEnum.READY);
|
||||||
taskRunContext.changeContextState(ContextStateEnum.RUNNING);
|
taskRunContext.changeContextState(ContextStateEnum.RUNNING);
|
||||||
execute(taskRunContext);
|
execute(taskRunContext);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
taskRunContext.changeContextState(ContextStateEnum.BAD_ENDING);
|
taskRunContext.changeContextState(ContextStateEnum.BAD_ENDING);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
// TODO 执行结束修改context的state,并且通知父类
|
// TODO 执行结束修改context的state,并且通知父类
|
||||||
taskRunContext.changeContextState(ContextStateEnum.HAPPY_ENDING);
|
taskRunContext.changeContextState(ContextStateEnum.HAPPY_ENDING);
|
||||||
|
@ -0,0 +1,22 @@
|
|||||||
|
package cd.casic.ci.process.engine.worker;
|
||||||
|
|
||||||
|
import cd.casic.ci.common.pipeline.annotation.Plugin;
|
||||||
|
import cd.casic.ci.process.engine.runContext.TaskRunContext;
|
||||||
|
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
|
||||||
|
import com.alibaba.fastjson.JSON;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Plugin(taskType = "Github")
|
||||||
|
public class TestGitWorker extends BaseWorker{
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void execute(TaskRunContext context) {
|
||||||
|
PipBaseElement contextDef = context.getContextDef();
|
||||||
|
String id = contextDef.getId();
|
||||||
|
log.info("==============触发worker执行========");
|
||||||
|
log.info("==========运行context:{}===========", JSON.toJSONString(context));
|
||||||
|
}
|
||||||
|
}
|
@ -9,7 +9,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|||||||
|
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Plugin(taskType = "testTask")
|
@Plugin(taskType = "test")
|
||||||
public class TestWorker extends BaseWorker{
|
public class TestWorker extends BaseWorker{
|
||||||
|
|
||||||
|
|
||||||
|
@ -6,6 +6,8 @@ import cd.casic.ci.common.pipeline.req.pipeline.PipelineReq;
|
|||||||
import cd.casic.ci.common.pipeline.req.pipeline.PipelineUpdateReq;
|
import cd.casic.ci.common.pipeline.req.pipeline.PipelineUpdateReq;
|
||||||
import cd.casic.ci.common.pipeline.resp.pipeline.PipelineFindResp;
|
import cd.casic.ci.common.pipeline.resp.pipeline.PipelineFindResp;
|
||||||
import cd.casic.ci.common.pipeline.utils.PageResult;
|
import cd.casic.ci.common.pipeline.utils.PageResult;
|
||||||
|
import cd.casic.ci.process.engine.executor.PipelineExecutor;
|
||||||
|
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
|
||||||
import cd.casic.ci.process.process.service.pipeline.PipelineService;
|
import cd.casic.ci.process.process.service.pipeline.PipelineService;
|
||||||
import cd.casic.framework.commons.pojo.CommonResult;
|
import cd.casic.framework.commons.pojo.CommonResult;
|
||||||
import jakarta.annotation.Resource;
|
import jakarta.annotation.Resource;
|
||||||
@ -32,6 +34,8 @@ public class PipelineController {
|
|||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private PipelineService pipelineService;
|
private PipelineService pipelineService;
|
||||||
|
@Resource
|
||||||
|
private PipelineExecutor pipelineExecutor;
|
||||||
|
|
||||||
@PermitAll
|
@PermitAll
|
||||||
@PostMapping(path="/createPipeline")
|
@PostMapping(path="/createPipeline")
|
||||||
@ -96,4 +100,9 @@ public class PipelineController {
|
|||||||
|
|
||||||
return CommonResult.success();
|
return CommonResult.success();
|
||||||
}
|
}
|
||||||
|
@PostMapping("/executePipeline")
|
||||||
|
public CommonResult<PipelineRunContext> executePipeline(String pipelineId){
|
||||||
|
PipelineRunContext execute = pipelineExecutor.execute(pipelineId);
|
||||||
|
return CommonResult.success(execute);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -41,11 +41,11 @@ public class RedisMqTest {
|
|||||||
pipTask.setStageId("testStage");
|
pipTask.setStageId("testStage");
|
||||||
PipPipeline pipeline = new PipPipeline();
|
PipPipeline pipeline = new PipPipeline();
|
||||||
pipeline.setId("testPipeline");
|
pipeline.setId("testPipeline");
|
||||||
PipelineRunContext pipelineRunContext = new PipelineRunContext(pipeline);
|
PipelineRunContext pipelineRunContext = new PipelineRunContext(pipeline,1);
|
||||||
PipStage stage = new PipStage();
|
PipStage stage = new PipStage();
|
||||||
stage.setId("testStage");
|
stage.setId("testStage");
|
||||||
stage.setParentId("testPipeline");
|
stage.setParentId("testPipeline");
|
||||||
SecondStageRunContext secondStageRunContext = new SecondStageRunContext(stage,pipelineRunContext,new ConcurrentHashMap<>());
|
SecondStageRunContext secondStageRunContext = new SecondStageRunContext(stage,1,pipelineRunContext,new ConcurrentHashMap<>());
|
||||||
TaskRunContext taskRunContext = new TaskRunContext(pipTask,secondStageRunContext,new HashMap<>());
|
TaskRunContext taskRunContext = new TaskRunContext(pipTask,secondStageRunContext,new HashMap<>());
|
||||||
contextManager.contextRegister(pipelineRunContext);
|
contextManager.contextRegister(pipelineRunContext);
|
||||||
contextManager.contextRegister(secondStageRunContext);
|
contextManager.contextRegister(secondStageRunContext);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user