test2通过(如果正确注册context就能找到worker执行,还差分发逻辑暂时没测试)

This commit is contained in:
even 2025-05-19 12:37:32 +08:00
parent 5101584b68
commit e3fc790948
5 changed files with 73 additions and 11 deletions

View File

@ -68,11 +68,15 @@ public class DefaultWorkerManager extends AbstractRedisStreamMessageListener<Tas
@Override
public void onMessage(TaskRunMessage message) {
log.info("===============接收到消息================");
PipTask task = message.getTask();
String taskType = task.getTaskType();
BaseWorker baseWorker = taskTypeWorkerMap.get(taskType);
baseWorker.setContextKey(task.getId());
workerExecutor.execute(baseWorker);
try {
PipTask task = message.getTask();
String taskType = task.getTaskType();
BaseWorker baseWorker = taskTypeWorkerMap.get(taskType);
baseWorker.setContextKey(task.getId());
workerExecutor.execute(baseWorker);
}catch (Exception e){
// TODO 后期可以考虑专门整一个队列
}
}
@Override

View File

@ -23,7 +23,7 @@ public class TaskRunContext extends BaseRunContext{
* */
@Override
public BaseRunContext getRunContext(String id) {
if (!StringUtils.isEmpty(id)||!id.equals(this.getContextDef().getId())) {
if (StringUtils.isEmpty(id)||!id.equals(this.getContextDef().getId())) {
return null;
}
return this;

View File

@ -24,10 +24,6 @@ public abstract class BaseWorker implements Runnable{
private RunContextManager contextManager;
private String contextKey;
@PostConstruct
public void initName(){
}
@Override
public void run() {
if (StringUtils.isEmpty(contextKey)) {

View File

@ -3,6 +3,8 @@ package cd.casic.ci.process.process.converter;
import cd.casic.ci.common.pipeline.resp.pipeline.PipelineFindResp;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
import org.mapstruct.factory.Mappers;
import java.util.List;
@ -17,7 +19,6 @@ import java.util.List;
@Mapper(componentModel = "spring")
public interface PipelineConverter {
PipelineConverter INSTANCE = Mappers.getMapper(PipelineConverter.class);
PipelineFindResp toResp(PipPipeline pipPipeline);
List<PipelineFindResp> toRespList(List<PipPipeline> pipPipelines);

View File

@ -0,0 +1,61 @@
package cd.casic.server;
import cd.casic.ci.process.engine.manager.RunContextManager;
import cd.casic.ci.process.engine.message.TaskRunMessage;
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.dataObject.stage.PipStage;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
@SpringBootTest(classes = {OpsServerApplication.class})
@ActiveProfiles("local")
public class RedisMqTest {
@Resource
RedisMQTemplate redisMQTemplate;
@Resource
RunContextManager contextManager;
@Test
public void test01(){
System.out.println("h w !");
redisMQTemplate.send(new TaskRunMessage());
}
@Test
public void test02(){
System.out.println("h w !");
TaskRunMessage taskRunMessage = new TaskRunMessage();
PipTask pipTask = new PipTask();
pipTask.setTaskType("testTask");
pipTask.setId("testTaskId");
pipTask.setStageId("testStage");
PipPipeline pipeline = new PipPipeline();
pipeline.setId("testPipeline");
PipelineRunContext pipelineRunContext = new PipelineRunContext(pipeline);
PipStage stage = new PipStage();
stage.setId("testStage");
stage.setParentId("testPipeline");
SecondStageRunContext secondStageRunContext = new SecondStageRunContext(stage,pipelineRunContext,new ConcurrentHashMap<>());
TaskRunContext taskRunContext = new TaskRunContext(pipTask,secondStageRunContext);
contextManager.contextRegister(pipelineRunContext);
contextManager.contextRegister(secondStageRunContext);
contextManager.contextRegister(taskRunContext);
taskRunMessage.setTask(pipTask);
redisMQTemplate.send(taskRunMessage);
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}