From b8170bc955d97035e1002d8373e1c8b6c86fde23 Mon Sep 17 00:00:00 2001 From: HopeLi <1278288511@qq.com> Date: Mon, 26 May 2025 20:11:44 +0800 Subject: [PATCH] 0526 ljc --- .../PipelineSchedulingBootstrapper.java | 45 ++++++ .../config/PipelineSchedulerConfig.java | 146 ++++++++++++++++++ ...ipelineSchedulingPropertiesController.java | 54 +++++++ .../dao/PipelineSchedulingPropertiesDao.java | 14 ++ .../PipelineSchedulingProperties.java | 37 +++++ .../PipelineSchedulingPropertiesService.java | 27 ++++ ...pelineSchedulingPropertiesServiceImpl.java | 71 +++++++++ 7 files changed, 394 insertions(+) create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/PipelineSchedulingBootstrapper.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/config/PipelineSchedulerConfig.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/controller/PipelineSchedulingPropertiesController.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/dao/PipelineSchedulingPropertiesDao.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/dateObject/PipelineSchedulingProperties.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/service/PipelineSchedulingPropertiesService.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/service/impl/PipelineSchedulingPropertiesServiceImpl.java diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/PipelineSchedulingBootstrapper.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/PipelineSchedulingBootstrapper.java new file mode 100644 index 00000000..b0631390 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/PipelineSchedulingBootstrapper.java @@ -0,0 +1,45 @@ +package cd.casic.ci.process.engine.scheduler; + +import cd.casic.ci.process.engine.enums.ContextStateEnum; +import cd.casic.ci.process.engine.executor.PipelineExecutor; +import cd.casic.ci.process.engine.scheduler.config.PipelineSchedulerConfig; +import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties; +import cd.casic.ci.process.engine.scheduler.service.impl.PipelineSchedulingPropertiesServiceImpl; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.Resource; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @author HopeLi + * @version v1.0 + * @ClassName PipelineSchedulingBootstrapper + * @Date: 2025/5/26 17:28 + * @Description: + */ +@Component +@RequiredArgsConstructor +public class PipelineSchedulingBootstrapper { + @Resource + private PipelineSchedulingPropertiesServiceImpl taskService; + + @Resource + private PipelineSchedulerConfig taskScheduler; + + @Resource + private PipelineExecutor pipelineExecutor; + + @PostConstruct + public void init() { + List tasks = taskService.getAllTasks(); + for (PipelineSchedulingProperties task : tasks) { + if (ContextStateEnum.RUNNING.getCode().equals(Integer.parseInt(task.getStatus()))) { + taskScheduler.addTask(task.getPipelineId(), ()->{ + pipelineExecutor.execute(task.getPipelineId()); + }, task.getCronExpression()); + } + } + } +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/config/PipelineSchedulerConfig.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/config/PipelineSchedulerConfig.java new file mode 100644 index 00000000..466bd545 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/config/PipelineSchedulerConfig.java @@ -0,0 +1,146 @@ +package cd.casic.ci.process.engine.scheduler.config; + +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.SchedulingConfigurer; +import org.springframework.scheduling.config.CronTask; +import org.springframework.scheduling.config.ScheduledTask; +import org.springframework.scheduling.config.ScheduledTaskRegistrar; +import org.springframework.stereotype.Component; + +import java.lang.reflect.Field; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ScheduledFuture; + +/** + * @author HopeLi + * @version v1.0 + * @ClassName PipelineSchedulerConfig + * @Date: 2025/5/26 10:55 + * @Description: + */ + +@Configuration +@EnableScheduling +@Slf4j +public class PipelineSchedulerConfig implements SchedulingConfigurer { + @Resource + private ScheduledTaskRegistrar taskRegistrar; + + private final Map taskFutures = new HashMap<>(); + + private final Map cronTaskMap = new HashMap<>(); + + @Override + public void configureTasks(@NotNull ScheduledTaskRegistrar taskRegistrar) { + this.taskRegistrar = taskRegistrar; + } + + + public void addTask(String taskId, Runnable task, String cronExpression) { + CronTask cronTask = new CronTask(task, cronExpression); + ScheduledTask scheduledTask = taskRegistrar.scheduleCronTask(cronTask); + taskFutures.put(taskId, scheduledTask); + cronTaskMap.put(taskId, cronTask); + + log.info("任务 [{}] 已添加,cron: {}", taskId, cronExpression); + } + + public void removeTask(String taskId) { + ScheduledTask scheduledTask = taskFutures.get(taskId); + if (scheduledTask != null) { + ScheduledFuture future = getFuture(scheduledTask); + if (future != null) { + future.cancel(true); + } + taskFutures.remove(taskId); + cronTaskMap.remove(taskId); + + log.info("任务 [{}] 已删除", taskId); + } + } + + public void updateTask(String taskId, String cronExpression) { + removeTask(taskId); + CronTask oldTask = cronTaskMap.get(taskId); + if (oldTask == null) { + throw new IllegalArgumentException("未找到对应的任务: " + taskId); + } + addTask(taskId, oldTask.getRunnable(), cronExpression); + log.info("任务 [{}] 已更新,新 cron: {}", taskId, cronExpression); + } + + public void startTask(String taskId) { + ScheduledTask scheduledTask = taskFutures.get(taskId); + if (scheduledTask == null) { + log.warn("任务 [{}] 不存在", taskId); + return; + } + + ScheduledFuture future = getFuture(scheduledTask); + if (future != null && !future.isCancelled()) { + log.info("任务 [{}] 正在运行中,无需重启", taskId); + return; + } + + CronTask cronTask = cronTaskMap.get(taskId); + if (cronTask == null) { + log.warn("未找到对应的任务定义: {}", taskId); + return; + } + + ScheduledTask newTask = taskRegistrar.scheduleCronTask(cronTask); + taskFutures.put(taskId, newTask); + log.info("任务 [{}] 已重启", taskId); + } + + /** + * 停止一个任务 + */ + public void stopTask(String taskId) { + removeTask(taskId); + log.info("任务 [{}] 已停止", taskId); + } + + + /** + * 获取 ScheduledTask 中的 future 字段 + */ + private ScheduledFuture getFuture(ScheduledTask scheduledTask) { + try { + Field field = scheduledTask.getClass().getDeclaredField("future"); + field.setAccessible(true); + return (ScheduledFuture) field.get(scheduledTask); + } catch (Exception e) { + log.error("反射获取 future 失败: {}", e.getMessage()); + return null; + } + } + + + /** + * 判断任务是否已取消 + */ + public boolean isTaskCancelled(String taskId) { + ScheduledTask scheduledTask = taskFutures.get(taskId); + if (scheduledTask == null) return true; + + ScheduledFuture future = getFuture(scheduledTask); + return future != null && future.isCancelled(); + } + + @Component + public static class PipelineSchedulerTask implements Runnable { + + @Override + public void run() { + // 定时任务逻辑代码 + System.out.println("Pipeline Task: " + new Date()); + } + } +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/controller/PipelineSchedulingPropertiesController.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/controller/PipelineSchedulingPropertiesController.java new file mode 100644 index 00000000..49855176 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/controller/PipelineSchedulingPropertiesController.java @@ -0,0 +1,54 @@ +package cd.casic.ci.process.engine.scheduler.controller; + +import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties; +import cd.casic.ci.process.engine.scheduler.service.PipelineSchedulingPropertiesService; +import cd.casic.ci.process.process.dataObject.base.BaseIdReq; +import jakarta.annotation.Resource; +import jakarta.validation.Valid; +import org.springframework.web.bind.annotation.*; + +import java.util.List; + +/** + * @author HopeLi + * @version v1.0 + * @ClassName PipelineSchedulingPropertiesController + * @Date: 2025/5/26 16:56 + * @Description: + */ +@RestController +@RequestMapping("/scheduling") +public class PipelineSchedulingPropertiesController { + @Resource + private PipelineSchedulingPropertiesService pipelineSchedulingPropertiesService; + + @PostMapping("/getAll") + public List getAll() { + return pipelineSchedulingPropertiesService.getAllTasks(); + } + + @PostMapping("/create") + public void create(@RequestBody @Valid PipelineSchedulingProperties task) { + pipelineSchedulingPropertiesService.addTask(task); + } + + @PostMapping("/update") + public void update(@RequestBody @Valid PipelineSchedulingProperties task) { + pipelineSchedulingPropertiesService.updateTask(task); + } + + @PostMapping("/delete") + public void delete(@RequestBody @Valid BaseIdReq req) { + pipelineSchedulingPropertiesService.deleteTask(req.getId()); + } + + @PostMapping("/start") + public void start(@RequestBody @Valid BaseIdReq req) { + pipelineSchedulingPropertiesService.startTask(req.getId()); + } + + @PostMapping("/stop") + public void stop(@RequestBody @Valid BaseIdReq req) { + pipelineSchedulingPropertiesService.stopTask(req.getId()); + } +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/dao/PipelineSchedulingPropertiesDao.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/dao/PipelineSchedulingPropertiesDao.java new file mode 100644 index 00000000..a2186a03 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/dao/PipelineSchedulingPropertiesDao.java @@ -0,0 +1,14 @@ +package cd.casic.ci.process.engine.scheduler.dao; + +import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties; +import cd.casic.framework.mybatis.core.mapper.BaseMapperX; + +/** + * @author HopeLi + * @version v1.0 + * @ClassName PipelineSchedulingPropertiesDao + * @Date: 2025/5/26 16:31 + * @Description: + */ +public interface PipelineSchedulingPropertiesDao extends BaseMapperX { +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/dateObject/PipelineSchedulingProperties.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/dateObject/PipelineSchedulingProperties.java new file mode 100644 index 00000000..2e21872a --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/dateObject/PipelineSchedulingProperties.java @@ -0,0 +1,37 @@ +package cd.casic.ci.process.engine.scheduler.dateObject; + +import cd.casic.ci.process.process.dataObject.base.PipBaseElement; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +/** + * @author HopeLi + * @version v1.0 + * @ClassName PipelineSchedulingProperties + * @Date: 2025/5/26 16:27 + * @Description: + */ + +@Data +@TableName("pip_pipeline_scheduling_properties") +public class PipelineSchedulingProperties extends PipBaseElement { + /** + * 任务唯一标识 + */ + private String pipelineId; + + /** + * 任务名称 + */ + private String pipelineName; + + /** + * 定时表达式 + */ + private String cronExpression; + + /** + * 执行状态 + */ + private String status; +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/service/PipelineSchedulingPropertiesService.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/service/PipelineSchedulingPropertiesService.java new file mode 100644 index 00000000..c27035dd --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/service/PipelineSchedulingPropertiesService.java @@ -0,0 +1,27 @@ +package cd.casic.ci.process.engine.scheduler.service; + +import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties; +import jakarta.validation.Valid; + +import java.util.List; + +/** + * @author HopeLi + * @version v1.0 + * @ClassName PipelineSchedulingPropertiesService + * @Date: 2025/5/26 16:48 + * @Description: + */ +public interface PipelineSchedulingPropertiesService { + List getAllTasks(); + + void addTask(@Valid PipelineSchedulingProperties task); + + void updateTask(@Valid PipelineSchedulingProperties task); + + void deleteTask(String id); + + void startTask(String id); + + void stopTask(String id); +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/service/impl/PipelineSchedulingPropertiesServiceImpl.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/service/impl/PipelineSchedulingPropertiesServiceImpl.java new file mode 100644 index 00000000..77facf79 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/service/impl/PipelineSchedulingPropertiesServiceImpl.java @@ -0,0 +1,71 @@ +package cd.casic.ci.process.engine.scheduler.service.impl; + +import cd.casic.ci.process.engine.enums.ContextStateEnum; +import cd.casic.ci.process.engine.scheduler.config.PipelineSchedulerConfig; +import cd.casic.ci.process.engine.scheduler.dao.PipelineSchedulingPropertiesDao; +import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties; +import cd.casic.ci.process.engine.scheduler.service.PipelineSchedulingPropertiesService; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * @author HopeLi + * @version v1.0 + * @ClassName PipelineSchedulingPropertiesServiceImpl + * @Date: 2025/5/26 16:49 + * @Description: + */ + +@Service +@Slf4j +public class PipelineSchedulingPropertiesServiceImpl extends ServiceImpl implements PipelineSchedulingPropertiesService { + + @Resource + private PipelineSchedulingPropertiesDao taskRepository; + + @Resource + private PipelineSchedulerConfig taskScheduler; + + public List getAllTasks() { + return taskRepository.selectList(null); + } + + public void addTask(PipelineSchedulingProperties task) { + if (ContextStateEnum.RUNNING.getCode().equals(Integer.parseInt(task.getStatus()))) { + taskScheduler.addTask(task.getPipelineId(), () -> { + },task.getCronExpression()); + } + taskRepository.insert(task); + } + + public void updateTask(PipelineSchedulingProperties task) { + PipelineSchedulingProperties old = taskRepository.selectById(task.getId()); + if (old != null) { + taskScheduler.updateTask(old.getPipelineId(), task.getCronExpression()); + } + taskRepository.updateById(task); + } + + public void deleteTask(String taskId) { + taskScheduler.removeTask(taskId); + taskRepository.delete(new QueryWrapper().eq("task_id", taskId)); + } + + public void startTask(String taskId) { + taskScheduler.startTask(taskId); + taskRepository.update(null, new UpdateWrapper() + .set("status", "RUNNING").eq("task_id", taskId)); + } + + public void stopTask(String taskId) { + taskScheduler.stopTask(taskId); + taskRepository.update(null, new UpdateWrapper() + .set("status", "STOPPED").eq("task_id", taskId)); + } +}