串行分发器,添加阻塞逻辑

This commit is contained in:
even 2025-05-19 15:12:51 +08:00
parent 4280521a3c
commit 889026d53c
4 changed files with 11 additions and 5 deletions

View File

@ -60,7 +60,7 @@ public class ParallelDispatcher implements BaseDispatcher{
// 等待当前阶段执行
latch.await();
}
// 入库
}
@Override
public void run() {

View File

@ -38,7 +38,7 @@ public class SerialDispatcher implements BaseDispatcher {
}
@Override
public void dispatch() {
public void dispatch() throws InterruptedException {
for (PipTask pipTask : taskList) {
// 注册taskContext且发送消息至消息队列给work执行, 如果需要则传入参数
TaskRunContext taskRunContext = new TaskRunContext(pipTask,stageRunContext,new HashMap<>());
@ -46,7 +46,13 @@ public class SerialDispatcher implements BaseDispatcher {
taskRunContext.changeContextState(ContextStateEnum.READY);
TaskRunMessage taskRunMessage = new TaskRunMessage(pipTask);
redisMQTemplate.send(taskRunMessage);
// TODO 监听当前taskContext状态
// TODO 监听当前taskContext状态变成执行成功或者执行失败(worker当中改变状态为运行中执行成功执行失败)
//
AtomicInteger state = taskRunContext.getState();
while (state.get() != ContextStateEnum.HAPPY_ENDING.getCode()
&& state.get() != ContextStateEnum.BAD_ENDING.getCode()) {
Thread.sleep(1000L);
}
}
}

View File

@ -61,7 +61,7 @@ public abstract class BaseRunContext {
callParentChange(stateEnum);
}
}
// 保证一直都操作同一个引用的值
private void setState(AtomicInteger state) {
this.state = state;
}

View File

@ -46,7 +46,7 @@ public class RedisMqTest {
stage.setId("testStage");
stage.setParentId("testPipeline");
SecondStageRunContext secondStageRunContext = new SecondStageRunContext(stage,pipelineRunContext,new ConcurrentHashMap<>());
TaskRunContext taskRunContext = new TaskRunContext(pipTask,secondStageRunContext);
TaskRunContext taskRunContext = new TaskRunContext(pipTask,secondStageRunContext,new HashMap<>());
contextManager.contextRegister(pipelineRunContext);
contextManager.contextRegister(secondStageRunContext);
contextManager.contextRegister(taskRunContext);