From e3fc790948ad897c32a3b55fa2ad48c3b00a13eb Mon Sep 17 00:00:00 2001 From: even <827656971@qq.com> Date: Mon, 19 May 2025 12:37:32 +0800 Subject: [PATCH] =?UTF-8?q?test2=E9=80=9A=E8=BF=87=EF=BC=88=E5=A6=82?= =?UTF-8?q?=E6=9E=9C=E6=AD=A3=E7=A1=AE=E6=B3=A8=E5=86=8Ccontext=E5=B0=B1?= =?UTF-8?q?=E8=83=BD=E6=89=BE=E5=88=B0worker=E6=89=A7=E8=A1=8C=EF=BC=8C?= =?UTF-8?q?=E8=BF=98=E5=B7=AE=E5=88=86=E5=8F=91=E9=80=BB=E8=BE=91=E6=9A=82?= =?UTF-8?q?=E6=97=B6=E6=B2=A1=E6=B5=8B=E8=AF=95=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../manager/impl/DefaultWorkerManager.java | 14 +++-- .../engine/runContext/TaskRunContext.java | 2 +- .../ci/process/engine/worker/BaseWorker.java | 4 -- .../process/converter/PipelineConverter.java | 3 +- .../java/cd/casic/server/RedisMqTest.java | 61 +++++++++++++++++++ 5 files changed, 73 insertions(+), 11 deletions(-) create mode 100644 ops-server/src/test/java/cd/casic/server/RedisMqTest.java diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultWorkerManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultWorkerManager.java index 584316c4..2ca1b724 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultWorkerManager.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultWorkerManager.java @@ -68,11 +68,15 @@ public class DefaultWorkerManager extends AbstractRedisStreamMessageListener toRespList(List pipPipelines); diff --git a/ops-server/src/test/java/cd/casic/server/RedisMqTest.java b/ops-server/src/test/java/cd/casic/server/RedisMqTest.java new file mode 100644 index 00000000..9510ac9d --- /dev/null +++ b/ops-server/src/test/java/cd/casic/server/RedisMqTest.java @@ -0,0 +1,61 @@ +package cd.casic.server; + +import cd.casic.ci.process.engine.manager.RunContextManager; +import cd.casic.ci.process.engine.message.TaskRunMessage; +import cd.casic.ci.process.engine.runContext.PipelineRunContext; +import cd.casic.ci.process.engine.runContext.SecondStageRunContext; +import cd.casic.ci.process.engine.runContext.TaskRunContext; +import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; +import cd.casic.ci.process.process.dataObject.stage.PipStage; +import cd.casic.ci.process.process.dataObject.task.PipTask; +import cd.casic.framework.mq.redis.core.RedisMQTemplate; +import jakarta.annotation.Resource; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; + +import java.io.IOException; +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; + +@SpringBootTest(classes = {OpsServerApplication.class}) +@ActiveProfiles("local") +public class RedisMqTest { + @Resource + RedisMQTemplate redisMQTemplate; + @Resource + RunContextManager contextManager; + @Test + public void test01(){ + System.out.println("h w !"); + redisMQTemplate.send(new TaskRunMessage()); + } + + @Test + public void test02(){ + System.out.println("h w !"); + TaskRunMessage taskRunMessage = new TaskRunMessage(); + PipTask pipTask = new PipTask(); + pipTask.setTaskType("testTask"); + pipTask.setId("testTaskId"); + pipTask.setStageId("testStage"); + PipPipeline pipeline = new PipPipeline(); + pipeline.setId("testPipeline"); + PipelineRunContext pipelineRunContext = new PipelineRunContext(pipeline); + PipStage stage = new PipStage(); + stage.setId("testStage"); + stage.setParentId("testPipeline"); + SecondStageRunContext secondStageRunContext = new SecondStageRunContext(stage,pipelineRunContext,new ConcurrentHashMap<>()); + TaskRunContext taskRunContext = new TaskRunContext(pipTask,secondStageRunContext); + contextManager.contextRegister(pipelineRunContext); + contextManager.contextRegister(secondStageRunContext); + contextManager.contextRegister(taskRunContext); + taskRunMessage.setTask(pipTask); + redisMQTemplate.send(taskRunMessage); + try { + Thread.sleep(5000L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } +}