From d22268f9a54d39053acdcb93112e1b57024e9352 Mon Sep 17 00:00:00 2001
From: even <827656971@qq.com>
Date: Tue, 15 Apr 2025 20:05:16 +0800
Subject: [PATCH] =?UTF-8?q?=E7=9B=91=E5=90=AC=E5=99=A8=E7=B1=BB?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
modules/module-ci-event/pom.xml | 4 +
.../java/cd/casic/ci/event/pojo/IEvent.java | 3 +-
.../ProjectCallbackEventListener.java | 3 +-
.../engine/control/CallBackControl.java | 2 +-
.../listener/MQPipelineCreateListener.java | 27 ++++
.../listener/MQPipelineDeleteListener.java | 26 ++++
.../listener/MQPipelineRestoreListener.java | 8 ++
.../MQPipelineStreamEnabledListener.java | 8 ++
.../listener/MQPipelineUpdateListener.java | 8 ++
.../pojo/event/PipelineCreateEvent.java | 58 +++++++++
.../service/AgentPipelineRefService.java | 4 +
.../PipelineAtomStatisticsService.java | 39 ++++++
.../engine/service/PipelineGroupService.java | 4 +
.../service/PipelineRuntimeService.java | 4 +
.../service/PipelineWebhookService.java | 118 ++++++++++++++++++
.../service/RepoPipelineRefService.java | 4 +
16 files changed, 316 insertions(+), 4 deletions(-)
rename modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/{ => process}/engine/control/CallBackControl.java (99%)
create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/listener/MQPipelineCreateListener.java
create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/listener/MQPipelineDeleteListener.java
create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/listener/MQPipelineRestoreListener.java
create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/listener/MQPipelineStreamEnabledListener.java
create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/listener/MQPipelineUpdateListener.java
create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/pojo/event/PipelineCreateEvent.java
create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/AgentPipelineRefService.java
create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/PipelineAtomStatisticsService.java
create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/PipelineGroupService.java
create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/PipelineRuntimeService.java
create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/PipelineWebhookService.java
create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/RepoPipelineRefService.java
diff --git a/modules/module-ci-event/pom.xml b/modules/module-ci-event/pom.xml
index d65726e..da0b078 100644
--- a/modules/module-ci-event/pom.xml
+++ b/modules/module-ci-event/pom.xml
@@ -20,6 +20,10 @@
cd.casic.boot
module-ci-common-pipeline
+
+ cd.casic.boot
+ spring-boot-starter-mq
+
\ No newline at end of file
diff --git a/modules/module-ci-event/src/main/java/cd/casic/ci/event/pojo/IEvent.java b/modules/module-ci-event/src/main/java/cd/casic/ci/event/pojo/IEvent.java
index 7d7cad1..ceffcaa 100644
--- a/modules/module-ci-event/src/main/java/cd/casic/ci/event/pojo/IEvent.java
+++ b/modules/module-ci-event/src/main/java/cd/casic/ci/event/pojo/IEvent.java
@@ -1,5 +1,6 @@
package cd.casic.ci.event.pojo;
+import cd.casic.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
import com.mysql.cj.MessageBuilder;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -19,7 +20,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
@NoArgsConstructor
@AllArgsConstructor
-public class IEvent {
+public class IEvent extends AbstractRedisStreamMessage {
private int delayMills = 0;
private int retryTime = 1;
diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/callback/listener/ProjectCallbackEventListener.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/callback/listener/ProjectCallbackEventListener.java
index 3b074b1..4217511 100644
--- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/callback/listener/ProjectCallbackEventListener.java
+++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/callback/listener/ProjectCallbackEventListener.java
@@ -1,7 +1,7 @@
package cd.casic.ci.process.process.callback.listener;
-import cd.casic.ci.process.engine.control.CallBackControl;
+import cd.casic.ci.process.process.engine.control.CallBackControl;
import cd.casic.ci.project.dal.pojo.ProjectCreateInfo;
import cd.casic.ci.project.dal.pojo.ProjectVO;
import cd.casic.ci.project.pojo.ProjectUpdateInfo;
@@ -14,7 +14,6 @@ import cd.casic.framework.mq.redis.core.stream.AbstractRedisStreamMessageListene
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/control/CallBackControl.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/control/CallBackControl.java
similarity index 99%
rename from modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/control/CallBackControl.java
rename to modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/control/CallBackControl.java
index 7166cc1..ba98da7 100644
--- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/control/CallBackControl.java
+++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/control/CallBackControl.java
@@ -1,4 +1,4 @@
-package cd.casic.ci.process.engine.control;
+package cd.casic.ci.process.process.engine.control;
import cd.casic.ci.common.pipeline.Model;
diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/listener/MQPipelineCreateListener.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/listener/MQPipelineCreateListener.java
new file mode 100644
index 0000000..dc92d40
--- /dev/null
+++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/listener/MQPipelineCreateListener.java
@@ -0,0 +1,27 @@
+package cd.casic.ci.process.process.engine.listener;
+
+import cd.casic.ci.process.process.engine.control.CallBackControl;
+import cd.casic.ci.process.process.engine.pojo.event.PipelineCreateEvent;
+import cd.casic.ci.process.process.engine.service.AgentPipelineRefService;
+import cd.casic.ci.process.process.engine.service.PipelineAtomStatisticsService;
+import cd.casic.ci.process.process.engine.service.PipelineWebhookService;
+import cd.casic.ci.process.process.engine.service.RepoPipelineRefService;
+import cd.casic.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
+import org.springframework.stereotype.Service;
+/**
+ * MQ实现的流水线创建事件
+ *
+ * @version 1.0
+ */
+@Service
+public class MQPipelineCreateListener extends AbstractRedisStreamMessageListener {
+ private PipelineWebhookService pipelineWebhookService;
+ private PipelineAtomStatisticsService pipelineAtomStatisticsService;
+ private CallBackControl callBackControl;
+ private AgentPipelineRefService agentPipelineRefService;
+ private RepoPipelineRefService repoPipelineRefService;
+ @Override
+ public void onMessage(PipelineCreateEvent message) {
+
+ }
+}
diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/listener/MQPipelineDeleteListener.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/listener/MQPipelineDeleteListener.java
new file mode 100644
index 0000000..29f3687
--- /dev/null
+++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/listener/MQPipelineDeleteListener.java
@@ -0,0 +1,26 @@
+package cd.casic.ci.process.process.engine.listener;
+
+import cd.casic.ci.process.process.engine.control.CallBackControl;
+import cd.casic.ci.process.process.engine.pojo.event.PipelineCreateEvent;
+import cd.casic.ci.process.process.engine.service.*;
+import cd.casic.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
+import org.springframework.stereotype.Service;
+/**
+ * MQ实现的流水线删除事件
+ *
+ * @version 1.0
+ */
+@Service
+public class MQPipelineDeleteListener extends AbstractRedisStreamMessageListener {
+ private PipelineRuntimeService pipelineRuntimeService;
+ private PipelineWebhookService pipelineWebhookService;
+ private PipelineGroupService pipelineGroupService;
+ private PipelineAtomStatisticsService pipelineAtomStatisticsService;
+ private CallBackControl callBackControl;
+ private AgentPipelineRefService agentPipelineRefService;
+ private RepoPipelineRefService repoPipelineRefService;
+ @Override
+ public void onMessage(PipelineCreateEvent message) {
+
+ }
+}
diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/listener/MQPipelineRestoreListener.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/listener/MQPipelineRestoreListener.java
new file mode 100644
index 0000000..3fdd38f
--- /dev/null
+++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/listener/MQPipelineRestoreListener.java
@@ -0,0 +1,8 @@
+package cd.casic.ci.process.process.engine.listener;
+/**
+ * MQ实现的流水线恢复事件
+ *
+ * @version 1.0
+ */
+public class MQPipelineRestoreListener {
+}
diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/listener/MQPipelineStreamEnabledListener.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/listener/MQPipelineStreamEnabledListener.java
new file mode 100644
index 0000000..2c973ea
--- /dev/null
+++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/listener/MQPipelineStreamEnabledListener.java
@@ -0,0 +1,8 @@
+package cd.casic.ci.process.process.engine.listener;
+/**
+ * MQ实现的流水线开启Stream事件
+ *
+ * @version 1.0
+ */
+public class MQPipelineStreamEnabledListener {
+}
diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/listener/MQPipelineUpdateListener.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/listener/MQPipelineUpdateListener.java
new file mode 100644
index 0000000..e7c7109
--- /dev/null
+++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/listener/MQPipelineUpdateListener.java
@@ -0,0 +1,8 @@
+package cd.casic.ci.process.process.engine.listener;
+/**
+ * MQ实现的流水线更新事件
+ *
+ * @version 1.0
+ */
+public class MQPipelineUpdateListener {
+}
diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/pojo/event/PipelineCreateEvent.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/pojo/event/PipelineCreateEvent.java
new file mode 100644
index 0000000..6652b5f
--- /dev/null
+++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/pojo/event/PipelineCreateEvent.java
@@ -0,0 +1,58 @@
+package cd.casic.ci.process.process.engine.pojo.event;
+
+
+
+
+import cd.casic.ci.common.pipeline.pojo.BuildNo;
+import cd.casic.ci.common.pipeline.pojo.element.Element;
+import cd.casic.ci.event.enums.ActionType;
+import cd.casic.ci.event.pojo.pipeline.IPipelineEvent;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import java.util.Map;
+
+/**
+ * 创建流水线事件
+ *
+ * @version 1.0
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class PipelineCreateEvent extends IPipelineEvent {
+
+ private BuildNo buildNo;
+ private String pipelineName;
+ private Element element;
+ private Integer version;
+ private Map variables;
+
+ public PipelineCreateEvent(
+ String source,
+ String projectId,
+ String pipelineId,
+ String userId,
+ BuildNo buildNo,
+ String pipelineName,
+ Element element,
+ Integer version,
+ Map variables,
+ ActionType actionType,
+ int delayMills
+ ) {
+ super(actionType, source, projectId, pipelineId, userId, delayMills);
+ this.buildNo = buildNo;
+ this.pipelineName = pipelineName;
+ this.element = element;
+ this.version = version;
+ this.variables = variables;
+ }
+
+ public PipelineCreateEvent(
+ String source,
+ String projectId,
+ String pipelineId,
+ String userId
+ ) {
+ this(source, projectId, pipelineId, userId, null, null, null, 1, null, ActionType.START, 0);
+ }
+}
diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/AgentPipelineRefService.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/AgentPipelineRefService.java
new file mode 100644
index 0000000..9a24070
--- /dev/null
+++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/AgentPipelineRefService.java
@@ -0,0 +1,4 @@
+package cd.casic.ci.process.process.engine.service;
+
+public interface AgentPipelineRefService {
+}
diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/PipelineAtomStatisticsService.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/PipelineAtomStatisticsService.java
new file mode 100644
index 0000000..6db3a39
--- /dev/null
+++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/PipelineAtomStatisticsService.java
@@ -0,0 +1,39 @@
+package cd.casic.ci.process.process.engine.service;
+
+import java.util.Set;
+
+/**
+ * 流水线插件统计相关的服务
+ * @version 1.0
+ */
+public interface PipelineAtomStatisticsService {
+
+ /**
+ * 更新插件对应的流水线数量
+ */
+ void updateAtomPipelineNum(
+ String projectId,
+ String pipelineId,
+ Integer version,
+ Boolean deleteFlag,
+ Boolean restoreFlag
+ );
+
+ /**
+ * 添加流水线数量更新
+ */
+ void addPipelineNumUpdate(
+ Set modelVersionAtomSet,
+// List pipelineNumUpdateList,
+ Boolean incrementFlag
+ );
+
+ /**
+ * 获取版本模型字符串
+ */
+ String getVersionModelString(
+ String projectId,
+ String pipelineId,
+ Integer version
+ );
+}
\ No newline at end of file
diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/PipelineGroupService.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/PipelineGroupService.java
new file mode 100644
index 0000000..7c5804e
--- /dev/null
+++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/PipelineGroupService.java
@@ -0,0 +1,4 @@
+package cd.casic.ci.process.process.engine.service;
+
+public interface PipelineGroupService {
+}
diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/PipelineRuntimeService.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/PipelineRuntimeService.java
new file mode 100644
index 0000000..b00b428
--- /dev/null
+++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/PipelineRuntimeService.java
@@ -0,0 +1,4 @@
+package cd.casic.ci.process.process.engine.service;
+
+public interface PipelineRuntimeService {
+}
diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/PipelineWebhookService.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/PipelineWebhookService.java
new file mode 100644
index 0000000..566e051
--- /dev/null
+++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/PipelineWebhookService.java
@@ -0,0 +1,118 @@
+package cd.casic.ci.process.process.engine.service;
+
+
+import cd.casic.ci.common.pipeline.Model;
+import cd.casic.ci.common.pipeline.enums.RepositoryConfig;
+import cd.casic.ci.common.pipeline.enums.ScmType;
+import cd.casic.ci.common.pipeline.pojo.element.trigger.enums.CodeEventType;
+import cd.casic.ci.log.scm.dal.pojo.Repository;
+import cd.casic.ci.process.api.process.pojo.webhook.PipelineWebhook;
+import cd.casic.ci.process.api.process.pojo.webhook.WebhookTriggerPipeline;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * 流水线Webhook服务接口
+ */
+public interface PipelineWebhookService {
+
+ /**
+ * 添加Webhook
+ */
+ void addWebhook(
+ String projectId,
+ String pipelineId,
+ Integer version,
+ String userId
+ );
+
+ /**
+ * 注册Webhook
+ */
+ Repository registerWebhook(
+ String projectId,
+ ScmType scmType,
+ RepositoryConfig repositoryConfig,
+ CodeEventType codeEventType,
+ String elementVersion
+ );
+
+ /**
+ * 删除Webhook
+ */
+ Boolean deleteWebhook(
+ String projectId,
+ String pipelineId,
+ String userId
+ );
+
+ /**
+ * 删除指定任务的Webhook
+ */
+ Boolean deleteWebhook(
+ String projectId,
+ String pipelineId,
+ String taskId,
+ String userId
+ );
+
+ /**
+ * 获取模型
+ */
+ Model getModel(
+ String projectId,
+ String pipelineId,
+ Integer version
+ );
+
+ /**
+ * 获取触发流水线列表
+ */
+ List getTriggerPipelines(
+ String name,
+ ScmType repositoryType,
+ List yamlPipelineIds
+ );
+
+ /**
+ * 列出触发流水线
+ */
+ List listTriggerPipeline(
+ String projectId,
+ String repositoryHashId,
+ String eventType
+ );
+
+ /**
+ * 获取项目名称
+ */
+ String getProjectName(String projectName);
+
+ /**
+ * 获取外部名称
+ */
+ String getExternalName(ScmType scmType, String projectName);
+
+ /**
+ * 列出Webhook
+ */
+ List listWebhook(
+ String userId,
+ String projectId,
+ String pipelineId,
+ Integer page,
+ Integer pageSize
+ );
+
+ /**
+ * 获取Webhook
+ */
+ PipelineWebhook get(
+ String projectId,
+ String pipelineId,
+ String repositoryHashId,
+ String eventType
+ );
+}
diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/RepoPipelineRefService.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/RepoPipelineRefService.java
new file mode 100644
index 0000000..3992545
--- /dev/null
+++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/engine/service/RepoPipelineRefService.java
@@ -0,0 +1,4 @@
+package cd.casic.ci.process.process.engine.service;
+
+public interface RepoPipelineRefService {
+}