失败状态逻辑更改

This commit is contained in:
even 2025-05-29 10:15:10 +08:00
parent 5be88bf251
commit f0b36fad6d
4 changed files with 40 additions and 4 deletions

View File

@ -60,6 +60,11 @@ public class ParallelDispatcher implements BaseDispatcher{
// 等待当前阶段执行 // 等待当前阶段执行
latch.await(); latch.await();
// TODO 检查是否全部执行成功 目前没有逻辑就是忽略错误 // TODO 检查是否全部执行成功 目前没有逻辑就是忽略错误
// 当前执行失败
while (pipelineRunContext.getState().get() != ContextStateEnum.RUNNING.getCode()) {
// 想办法借助工具类 或者直接wait
pipelineRunContext.pause();
}
} }
} }
@Override @Override

View File

@ -51,7 +51,8 @@ public class SerialDispatcher implements BaseDispatcher {
AtomicInteger state = taskRunContext.getState(); AtomicInteger state = taskRunContext.getState();
while (state.get() != ContextStateEnum.HAPPY_ENDING.getCode() while (state.get() != ContextStateEnum.HAPPY_ENDING.getCode()
&& state.get() != ContextStateEnum.BAD_ENDING.getCode()) { && state.get() != ContextStateEnum.BAD_ENDING.getCode()) {
Thread.sleep(1000L); // Thread.sleep(1000L);
taskRunContext.pause();
} }
// //
} }

View File

@ -6,13 +6,16 @@ import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data; import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.annotation.Transient; import org.springframework.data.annotation.Transient;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@Data @Data
@Slf4j
public abstract class BaseRunContext { public abstract class BaseRunContext {
/** /**
* 当前上下文的定义 * 当前上下文的定义
@ -46,6 +49,10 @@ public abstract class BaseRunContext {
* */ * */
private Map<String,Object> localVariables; private Map<String,Object> localVariables;
private Map<String,BaseRunContext> childContext; private Map<String,BaseRunContext> childContext;
/**
* 用来在控制其他地方的阻塞放行,countDown为1
* */
private CountDownLatch countDownLatch;
public BaseRunContext(PipBaseElement contextDef,Integer childCount,BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetVersionId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) { public BaseRunContext(PipBaseElement contextDef,Integer childCount,BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetVersionId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) {
this.contextDef = contextDef; this.contextDef = contextDef;
@ -81,6 +88,8 @@ public abstract class BaseRunContext {
if (parentContext==null) { if (parentContext==null) {
return; return;
} }
// 如果之前有暂停监听状态的.则停止暂停
unpause();
if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) { if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) {
this.endTime=LocalDateTime.now(); this.endTime=LocalDateTime.now();
parentContext.checkChildEnd(); parentContext.checkChildEnd();
@ -96,11 +105,12 @@ public abstract class BaseRunContext {
* */ * */
public void checkChildEnd() throws ServiceException{ public void checkChildEnd() throws ServiceException{
Map<String, BaseRunContext> childContext = getChildContext(); Map<String, BaseRunContext> childContext = getChildContext();
if (childContext.size()!=childCount) {
return;
}
int result = ContextStateEnum.HAPPY_ENDING.getCode(); int result = ContextStateEnum.HAPPY_ENDING.getCode();
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) { for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
if (childContext.size()!=childCount) {
return;
}
BaseRunContext child = entry.getValue(); BaseRunContext child = entry.getValue();
int state = child.getState().get(); int state = child.getState().get();
if (!ContextStateEnum.HAPPY_ENDING.getCode().equals(state)&&!ContextStateEnum.BAD_ENDING.getCode().equals(state)) { if (!ContextStateEnum.HAPPY_ENDING.getCode().equals(state)&&!ContextStateEnum.BAD_ENDING.getCode().equals(state)) {
@ -163,4 +173,23 @@ public abstract class BaseRunContext {
} }
} }
} }
public void pause(){
if (countDownLatch==null) {
synchronized(this) {
if (this.countDownLatch == null) {
this.countDownLatch= new CountDownLatch(1);
}
}
}
try {
this.countDownLatch.await();
} catch (InterruptedException e) {
log.error(e.getMessage());
}
}
private void unpause(){
if (this.countDownLatch!=null) {
this.countDownLatch.countDown();
}
}
} }

View File

@ -68,6 +68,7 @@ public abstract class BaseWorker implements Runnable{
} catch (Exception e) { } catch (Exception e) {
log.error("================worker执行报错",e); log.error("================worker执行报错",e);
taskRunContext.changeContextState(ContextStateEnum.BAD_ENDING); taskRunContext.changeContextState(ContextStateEnum.BAD_ENDING);
append(context,e.getMessage());
return; return;
} }
// TODO 执行结束修改context的state,并且通知父类 // TODO 执行结束修改context的state,并且通知父类