From 8c6011fcb456dab0ae2f9f3a218ede9f9b6153f1 Mon Sep 17 00:00:00 2001 From: HopeLi <1278288511@qq.com> Date: Tue, 27 May 2025 21:25:23 +0800 Subject: [PATCH 1/2] 0527 ljc --- .../PipelineSchedulingBootstrapper.java | 40 +++-- .../config/PipelineSchedulerConfig.java | 150 ----------------- .../engine/scheduler/config/QuartzConfig.java | 39 +++++ .../config/QuartzSchedulerManager.java | 155 ++++++++++++++++++ .../PipelineSchedulingController.java | 52 ++++++ ...ipelineSchedulingPropertiesController.java | 54 ------ .../PipelineSchedulingProperties.java | 11 +- .../enums/PipelineSchedulingStatusEnum.java | 56 +++++++ .../scheduler/job/PipelineSchedulingJob.java | 33 ++++ .../listener/QuartzStartupListener.java | 30 ++++ .../scheduler/req/PipelineSchedulingReq.java | 20 +++ .../PipelineSchedulingPropertiesService.java | 27 --- ...pelineSchedulingPropertiesServiceImpl.java | 71 -------- .../scheduler/trigger/SchedulingTrigger.java | 19 --- .../pipeline/impl/PipelineServiceImpl.java | 104 ++++++------ .../src/main/resources/application-local.yaml | 2 +- .../src/main/resources/application.yaml | 30 ++++ 17 files changed, 493 insertions(+), 400 deletions(-) delete mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/config/PipelineSchedulerConfig.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/config/QuartzConfig.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/config/QuartzSchedulerManager.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/controller/PipelineSchedulingController.java delete mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/controller/PipelineSchedulingPropertiesController.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/enums/PipelineSchedulingStatusEnum.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/job/PipelineSchedulingJob.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/listener/QuartzStartupListener.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/req/PipelineSchedulingReq.java delete mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/service/PipelineSchedulingPropertiesService.java delete mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/service/impl/PipelineSchedulingPropertiesServiceImpl.java delete mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/trigger/SchedulingTrigger.java diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/PipelineSchedulingBootstrapper.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/PipelineSchedulingBootstrapper.java index f0dddc8c..72fbcac3 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/PipelineSchedulingBootstrapper.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/PipelineSchedulingBootstrapper.java @@ -1,14 +1,15 @@ package cd.casic.ci.process.engine.scheduler; -import cd.casic.ci.process.engine.enums.ContextStateEnum; import cd.casic.ci.process.engine.executor.PipelineExecutor; -import cd.casic.ci.process.engine.scheduler.config.PipelineSchedulerConfig; +import cd.casic.ci.process.engine.scheduler.config.QuartzSchedulerManager; +import cd.casic.ci.process.engine.scheduler.dao.PipelineSchedulingPropertiesDao; import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties; -import cd.casic.ci.process.engine.scheduler.service.impl.PipelineSchedulingPropertiesServiceImpl; +import cd.casic.ci.process.engine.scheduler.req.PipelineSchedulingReq; import jakarta.annotation.PostConstruct; import jakarta.annotation.Resource; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import java.util.List; @@ -16,29 +17,32 @@ import java.util.List; * @author HopeLi * @version v1.0 * @ClassName PipelineSchedulingBootstrapper - * @Date: 2025/5/26 17:28 - * @Description: + * @Date: 2025/5/27 17:46 + * @Description: */ + @Component @RequiredArgsConstructor public class PipelineSchedulingBootstrapper { - @Resource - private PipelineSchedulingPropertiesServiceImpl taskService; + private final QuartzSchedulerManager quartzSchedulerManager; + private final PipelineExecutor pipelineExecutor; @Resource - private PipelineSchedulerConfig pipelineSchedulerConfig; - - @Resource - private PipelineExecutor pipelineExecutor; + private PipelineSchedulingPropertiesDao pipelineSchedulingPropertiesDao; @PostConstruct - public void init() { - List tasks = taskService.getAllTasks(); - for (PipelineSchedulingProperties task : tasks) { - if (ContextStateEnum.RUNNING.getCode().equals(Integer.parseInt(task.getStatus()))) { - pipelineSchedulerConfig.addTask(task.getPipelineId(), ()->{ - pipelineExecutor.execute(task.getPipelineId()); - }, task.getCronExpression()); + public void init(){ + List tasks = pipelineSchedulingPropertiesDao.selectList(); + if (!CollectionUtils.isEmpty(tasks)){ + for (PipelineSchedulingProperties task : tasks) { + try { + PipelineSchedulingReq req = new PipelineSchedulingReq(); + req.setPipelineId(task.getPipelineId()); + req.setCron(task.getCron()); + quartzSchedulerManager.addTask(req); + } catch (Exception e) { + e.printStackTrace(); + } } } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/config/PipelineSchedulerConfig.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/config/PipelineSchedulerConfig.java deleted file mode 100644 index eb47ae38..00000000 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/config/PipelineSchedulerConfig.java +++ /dev/null @@ -1,150 +0,0 @@ -package cd.casic.ci.process.engine.scheduler.config; - -import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; -import org.springframework.context.annotation.Configuration; -import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.annotation.SchedulingConfigurer; -import org.springframework.scheduling.config.CronTask; -import org.springframework.scheduling.config.ScheduledTask; -import org.springframework.scheduling.config.ScheduledTaskRegistrar; -import org.springframework.stereotype.Component; - -import java.lang.reflect.Field; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ScheduledFuture; - -/** - * @author HopeLi - * @version v1.0 - * @ClassName PipelineSchedulerConfig - * @Date: 2025/5/26 10:55 - * @Description: - */ - -@Configuration -@EnableScheduling -@Slf4j -@Component("pipelineSchedulerConfig") -public class PipelineSchedulerConfig implements SchedulingConfigurer { - private ScheduledTaskRegistrar taskRegistrar; - - private final Map taskFutures = new HashMap<>(); - - private final Map cronTaskMap = new HashMap<>(); - - @Override - public void configureTasks(@NotNull ScheduledTaskRegistrar taskRegistrar) { - this.taskRegistrar = taskRegistrar; - log.info("ScheduledTaskRegistrar 初始化完成"); - } - - - public void addTask(String taskId, Runnable task, String cronExpression) { - if (taskRegistrar == null) { - log.error("【定时任务失败】taskRegistrar 尚未初始化,请检查调度器是否已启动"); - return; - } - - CronTask cronTask = new CronTask(task, cronExpression); - ScheduledTask scheduledTask = taskRegistrar.scheduleCronTask(cronTask); - taskFutures.put(taskId, scheduledTask); - cronTaskMap.put(taskId, cronTask); - - log.info("任务 [{}] 已添加,cron: {}", taskId, cronExpression); - } - - public void removeTask(String taskId) { - ScheduledTask scheduledTask = taskFutures.get(taskId); - if (scheduledTask != null) { - ScheduledFuture future = getFuture(scheduledTask); - if (future != null) { - future.cancel(true); - } - taskFutures.remove(taskId); - cronTaskMap.remove(taskId); - - log.info("任务 [{}] 已删除", taskId); - } - } - - public void updateTask(String taskId, String cronExpression) { - removeTask(taskId); - CronTask oldTask = cronTaskMap.get(taskId); - if (oldTask == null) { - throw new IllegalArgumentException("未找到对应的任务: " + taskId); - } - addTask(taskId, oldTask.getRunnable(), cronExpression); - log.info("任务 [{}] 已更新,新 cron: {}", taskId, cronExpression); - } - - public void startTask(String taskId) { - ScheduledTask scheduledTask = taskFutures.get(taskId); - if (scheduledTask == null) { - log.warn("任务 [{}] 不存在", taskId); - return; - } - - ScheduledFuture future = getFuture(scheduledTask); - if (future != null && !future.isCancelled()) { - log.info("任务 [{}] 正在运行中,无需重启", taskId); - return; - } - - CronTask cronTask = cronTaskMap.get(taskId); - if (cronTask == null) { - log.warn("未找到对应的任务定义: {}", taskId); - return; - } - - ScheduledTask newTask = taskRegistrar.scheduleCronTask(cronTask); - taskFutures.put(taskId, newTask); - log.info("任务 [{}] 已重启", taskId); - } - - /** - * 停止一个任务 - */ - public void stopTask(String taskId) { - removeTask(taskId); - log.info("任务 [{}] 已停止", taskId); - } - - - /** - * 获取 ScheduledTask 中的 future 字段 - */ - private ScheduledFuture getFuture(ScheduledTask scheduledTask) { - try { - Field field = scheduledTask.getClass().getDeclaredField("future"); - field.setAccessible(true); - return (ScheduledFuture) field.get(scheduledTask); - } catch (Exception e) { - log.error("反射获取 future 失败: {}", e.getMessage()); - return null; - } - } - - - /** - * 判断任务是否已取消 - */ - public boolean isTaskCancelled(String taskId) { - ScheduledTask scheduledTask = taskFutures.get(taskId); - if (scheduledTask == null) return true; - - ScheduledFuture future = getFuture(scheduledTask); - return future != null && future.isCancelled(); - } - -// @Component -// public static class PipelineSchedulerTask implements Runnable { -// -// @Override -// public void run() { -// // 定时任务逻辑代码 -// System.out.println("Pipeline Task: " + new Date()); -// } -// } -} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/config/QuartzConfig.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/config/QuartzConfig.java new file mode 100644 index 00000000..36071493 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/config/QuartzConfig.java @@ -0,0 +1,39 @@ +package cd.casic.ci.process.engine.scheduler.config; + +import org.quartz.Scheduler; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.quartz.SchedulerFactoryBean; +import org.springframework.scheduling.quartz.SpringBeanJobFactory; + +/** + * @author HopeLi + * @version v1.0 + * @ClassName QuartzConfig + * @Date: 2025/5/27 21:16 + * @Description: + */ + +@Configuration +public class QuartzConfig { + private final ApplicationContext applicationContext; + + public QuartzConfig(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + } + + @Bean + public Scheduler scheduler() throws Exception { + SpringBeanJobFactory jobFactory = new SpringBeanJobFactory(); + jobFactory.setApplicationContext(applicationContext); + + SchedulerFactoryBean factory = new SchedulerFactoryBean(); + factory.setJobFactory(jobFactory); + factory.afterPropertiesSet(); + + Scheduler scheduler = factory.getScheduler(); + scheduler.start(); + return scheduler; + } +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/config/QuartzSchedulerManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/config/QuartzSchedulerManager.java new file mode 100644 index 00000000..d6312555 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/config/QuartzSchedulerManager.java @@ -0,0 +1,155 @@ +package cd.casic.ci.process.engine.scheduler.config; + +import cd.casic.ci.process.engine.scheduler.dao.PipelineSchedulingPropertiesDao; +import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties; +import cd.casic.ci.process.engine.scheduler.job.PipelineSchedulingJob; +import cd.casic.ci.process.engine.scheduler.req.PipelineSchedulingReq; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.quartz.*; +import org.quartz.impl.matchers.GroupMatcher; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.quartz.JobBuilder.newJob; +import static org.quartz.TriggerBuilder.newTrigger; + +/** + * @author HopeLi + * @version v1.0 + * @ClassName QuartzSchedulerManager + * @Date: 2025/5/27 17:31 + * @Description: + */ + +@Component +@Slf4j +public class QuartzSchedulerManager { + private static final String JOB_GROUP_NAME = "PIPELINE_JOB_GROUP"; + private static final String TRIGGER_GROUP_NAME = "PIPELINE_TRIGGER_GROUP"; + + @Resource + private Scheduler scheduler; + + @Resource + private PipelineSchedulingPropertiesDao pipelineSchedulingPropertiesDao; + + @PostConstruct + public void init() throws SchedulerException { + log.info("Quartz 调度器初始化完成"); + } + + /** + * 添加一个定时任务 + */ + public void addTask(PipelineSchedulingReq req) throws Exception { + JobDetail jobDetail = newJob(PipelineSchedulingJob.class) + .withIdentity(req.getPipelineId(), JOB_GROUP_NAME) + .usingJobData(PipelineSchedulingJob.JOB_PARAM_PIPELINE_ID, req.getPipelineId()) + .build(); + + Trigger trigger = newTrigger() + .withIdentity(req.getPipelineId(), TRIGGER_GROUP_NAME) + .withSchedule(CronScheduleBuilder.cronSchedule(req.getCron())) + .build(); + + scheduler.scheduleJob(jobDetail, trigger); + log.info("任务 [{}] 已添加,cron: {}", req.getPipelineId(), req.getCron()); + + QueryWrapper wrapper = new QueryWrapper<>(); + wrapper.eq("pipeline_id", req.getPipelineId()); + wrapper.eq("cron", req.getCron()); + List list = pipelineSchedulingPropertiesDao.selectList(wrapper); + + //为空则数据入库,否则仅为重新添加任务进调度器 + if (!CollectionUtils.isEmpty(list)){ + PipelineSchedulingProperties pipelineSchedulingProperties = new PipelineSchedulingProperties(); + pipelineSchedulingProperties.setPipelineId(req.getPipelineId()); + pipelineSchedulingProperties.setCron(req.getCron()); + pipelineSchedulingPropertiesDao.insert(pipelineSchedulingProperties); +// pipelineSchedulingProperties.setStatus(PipelineSchedulingStatusEnum.READY.getCode()); + } + + } + + /** + * 删除任务 + */ + public void removeTask(PipelineSchedulingReq req) throws SchedulerException { + TriggerKey triggerKey = TriggerKey.triggerKey(req.getPipelineId(), TRIGGER_GROUP_NAME); + scheduler.pauseTrigger(triggerKey); + scheduler.unscheduleJob(triggerKey); + scheduler.deleteJob(JobKey.jobKey(req.getPipelineId(), JOB_GROUP_NAME)); + log.info("任务 [{}] 已删除", req.getPipelineId()); + + //数据删除 + QueryWrapper wrapper = new QueryWrapper<>(); + wrapper.eq("pipeline_id", req.getPipelineId()); + wrapper.eq("cron", req.getCron()); + List list = pipelineSchedulingPropertiesDao.selectList(wrapper); + + if (!CollectionUtils.isEmpty(list)){ + List idList = list.stream().map(PipelineSchedulingProperties::getId).toList(); + pipelineSchedulingPropertiesDao.deleteByIds(idList); + } + } + + /** + * 更新任务 + */ + public void updateTask(PipelineSchedulingReq req) throws Exception { + removeTask(req); + addTask(req); + log.info("任务 [{}] 已更新,新 cron: {}", req.getPipelineId(), req.getCron()); + } + + /** + * 启动/重启任务 + */ + public void startTask(PipelineSchedulingReq req) throws SchedulerException { + JobKey jobKey = JobKey.jobKey(req.getPipelineId(), JOB_GROUP_NAME); + if (!scheduler.checkExists(jobKey)) { + log.warn("任务 [{}] 不存在", req.getPipelineId()); + return; + } + + if (scheduler.isShutdown()) { + scheduler.start(); + } + + List triggers = scheduler.getTriggersOfJob(jobKey); + for (Trigger trigger : triggers) { + if (trigger.getNextFireTime() == null) { + scheduler.rescheduleJob(TriggerKey.triggerKey(req.getPipelineId(), TRIGGER_GROUP_NAME), trigger); + } + } + + log.info("任务 [{}] 已重启", req.getPipelineId()); + } + + /** + * 判断任务是否已停止 + */ + public boolean isTaskPaused(PipelineSchedulingReq req) throws SchedulerException { + JobKey jobKey = JobKey.jobKey(req.getPipelineId(), JOB_GROUP_NAME); + return !scheduler.isStarted() || scheduler.getJobDetail(jobKey) == null; + } + + /** + * 获取所有任务 ID + */ + public Set getAllTaskIds() throws SchedulerException { + Set jobKeys = scheduler.getJobKeys(GroupMatcher.jobGroupEquals(JOB_GROUP_NAME)); + Set taskIds = new HashSet<>(); + for (JobKey jobKey : jobKeys) { + taskIds.add(jobKey.getName()); + } + return taskIds; + } +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/controller/PipelineSchedulingController.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/controller/PipelineSchedulingController.java new file mode 100644 index 00000000..0b8c00ba --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/controller/PipelineSchedulingController.java @@ -0,0 +1,52 @@ +package cd.casic.ci.process.engine.scheduler.controller; + +import cd.casic.ci.process.engine.scheduler.config.QuartzSchedulerManager; +import cd.casic.ci.process.engine.scheduler.req.PipelineSchedulingReq; +import cd.casic.framework.commons.pojo.CommonResult; +import jakarta.annotation.Resource; +import jakarta.validation.Valid; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * @author HopeLi + * @version v1.0 + * @ClassName PipelineTaskController + * @Date: 2025/5/27 19:18 + * @Description: + */ + + +@RestController +@RequestMapping("/scheduler") +public class PipelineSchedulingController { + + @Resource + private QuartzSchedulerManager quartzSchedulerManager; + + @PostMapping("/add") + public CommonResult add(@RequestBody @Valid PipelineSchedulingReq req) throws Exception { + quartzSchedulerManager.addTask(req); + return CommonResult.success(); + } + + @PostMapping("/remove") + public CommonResult remove(@RequestBody @Valid PipelineSchedulingReq req) throws Exception { + quartzSchedulerManager.removeTask(req); + return CommonResult.success(); + } + + @PostMapping("/update") + public CommonResult update(@RequestBody @Valid PipelineSchedulingReq req) throws Exception { + quartzSchedulerManager.updateTask(req); + return CommonResult.success(); + } + + @PostMapping("/start") + public CommonResult start(@RequestBody @Valid PipelineSchedulingReq req) throws Exception { + quartzSchedulerManager.startTask(req); + return CommonResult.success(); + } +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/controller/PipelineSchedulingPropertiesController.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/controller/PipelineSchedulingPropertiesController.java deleted file mode 100644 index 49855176..00000000 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/controller/PipelineSchedulingPropertiesController.java +++ /dev/null @@ -1,54 +0,0 @@ -package cd.casic.ci.process.engine.scheduler.controller; - -import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties; -import cd.casic.ci.process.engine.scheduler.service.PipelineSchedulingPropertiesService; -import cd.casic.ci.process.process.dataObject.base.BaseIdReq; -import jakarta.annotation.Resource; -import jakarta.validation.Valid; -import org.springframework.web.bind.annotation.*; - -import java.util.List; - -/** - * @author HopeLi - * @version v1.0 - * @ClassName PipelineSchedulingPropertiesController - * @Date: 2025/5/26 16:56 - * @Description: - */ -@RestController -@RequestMapping("/scheduling") -public class PipelineSchedulingPropertiesController { - @Resource - private PipelineSchedulingPropertiesService pipelineSchedulingPropertiesService; - - @PostMapping("/getAll") - public List getAll() { - return pipelineSchedulingPropertiesService.getAllTasks(); - } - - @PostMapping("/create") - public void create(@RequestBody @Valid PipelineSchedulingProperties task) { - pipelineSchedulingPropertiesService.addTask(task); - } - - @PostMapping("/update") - public void update(@RequestBody @Valid PipelineSchedulingProperties task) { - pipelineSchedulingPropertiesService.updateTask(task); - } - - @PostMapping("/delete") - public void delete(@RequestBody @Valid BaseIdReq req) { - pipelineSchedulingPropertiesService.deleteTask(req.getId()); - } - - @PostMapping("/start") - public void start(@RequestBody @Valid BaseIdReq req) { - pipelineSchedulingPropertiesService.startTask(req.getId()); - } - - @PostMapping("/stop") - public void stop(@RequestBody @Valid BaseIdReq req) { - pipelineSchedulingPropertiesService.stopTask(req.getId()); - } -} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/dateObject/PipelineSchedulingProperties.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/dateObject/PipelineSchedulingProperties.java index 7c376a4b..6413c0e8 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/dateObject/PipelineSchedulingProperties.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/dateObject/PipelineSchedulingProperties.java @@ -9,8 +9,8 @@ import lombok.EqualsAndHashCode; * @author HopeLi * @version v1.0 * @ClassName PipelineSchedulingProperties - * @Date: 2025/5/26 16:27 - * @Description: + * @Date: 2025/5/27 19:57 + * @Description: */ @EqualsAndHashCode(callSuper = true) @@ -22,15 +22,10 @@ public class PipelineSchedulingProperties extends PipBaseElement { */ private String pipelineId; - /** - * 任务名称 - */ - private String pipelineName; - /** * 定时表达式 */ - private String cronExpression; + private String cron; /** * 执行状态 diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/enums/PipelineSchedulingStatusEnum.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/enums/PipelineSchedulingStatusEnum.java new file mode 100644 index 00000000..dbe08311 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/enums/PipelineSchedulingStatusEnum.java @@ -0,0 +1,56 @@ +package cd.casic.ci.process.engine.scheduler.enums; + +import lombok.Getter; + +import java.util.EnumMap; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +@Getter +public enum PipelineSchedulingStatusEnum { + INIT("0","初始化"), + READY("1","就绪"), + RUNNING("2","运行"), + SUSPEND("3","挂起"), + STOP("-1","停止"), + HAPPY_ENDING("4","执行成功"), + BAD_ENDING("5","执行失败") + ; + + private String code; + private String msg; + private static final Map> TRANSITIONS = new EnumMap<>(PipelineSchedulingStatusEnum.class); + + static { + TRANSITIONS.put(INIT, Set.of(READY, SUSPEND, BAD_ENDING, STOP)); + TRANSITIONS.put(READY, Set.of(READY,RUNNING, SUSPEND, BAD_ENDING, STOP)); + TRANSITIONS.put(RUNNING, Set.of(RUNNING,SUSPEND, HAPPY_ENDING, BAD_ENDING, STOP)); + TRANSITIONS.put(SUSPEND, Set.of(SUSPEND,INIT, READY, BAD_ENDING, RUNNING,STOP)); + //...初始化其他状态转移关系 + } + + PipelineSchedulingStatusEnum(String code, String msg) { + this.code = code; + this.msg = msg; + + } + public static Boolean canGoto(PipelineSchedulingStatusEnum from, PipelineSchedulingStatusEnum to){ + try { + if (Objects.isNull(from) || Objects.isNull(to)) { + return false; + } + return TRANSITIONS.get(from).contains(to); + } catch (Exception e){ + return false; + } + } + public static PipelineSchedulingStatusEnum getByCode(String code){ + for (PipelineSchedulingStatusEnum value : values()) { + if (value.getCode().equals(code)) { + return value; + } + } + return null; + } +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/job/PipelineSchedulingJob.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/job/PipelineSchedulingJob.java new file mode 100644 index 00000000..01d80a81 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/job/PipelineSchedulingJob.java @@ -0,0 +1,33 @@ +package cd.casic.ci.process.engine.scheduler.job; + +import cd.casic.ci.process.engine.executor.PipelineExecutor; +import jakarta.annotation.Resource; +import org.quartz.Job; +import org.quartz.JobDataMap; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; + +/** + * @author HopeLi + * @version v1.0 + * @ClassName PipelineTaskJob + * @Date: 2025/5/27 17:28 + * @Description: + */ +public class PipelineSchedulingJob implements Job { + public static final String JOB_PARAM_PIPELINE_ID = "pipelineId"; + public static final String JOB_PARAM_EXECUTOR = "executor"; + + @Resource + private PipelineExecutor executor; + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + JobDataMap dataMap = context.getMergedJobDataMap(); + String pipelineId = dataMap.getString(JOB_PARAM_PIPELINE_ID); + + if (pipelineId != null && executor != null) { + executor.execute(pipelineId); + } + } +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/listener/QuartzStartupListener.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/listener/QuartzStartupListener.java new file mode 100644 index 00000000..26af6d5c --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/listener/QuartzStartupListener.java @@ -0,0 +1,30 @@ +package cd.casic.ci.process.engine.scheduler.listener; + +import lombok.RequiredArgsConstructor; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +/** + * @author HopeLi + * @version v1.0 + * @ClassName QuartzStartupListener + * @Date: 2025/5/27 20:42 + * @Description: + */ + +@Component +@RequiredArgsConstructor +public class QuartzStartupListener { + private final Scheduler scheduler; + + @EventListener(ApplicationReadyEvent.class) + public void onApplicationReady() throws SchedulerException { + if (!scheduler.isStarted()) { + scheduler.start(); + System.out.println("Quartz 调度器已手动启动"); + } + } +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/req/PipelineSchedulingReq.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/req/PipelineSchedulingReq.java new file mode 100644 index 00000000..4f54f31e --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/req/PipelineSchedulingReq.java @@ -0,0 +1,20 @@ +package cd.casic.ci.process.engine.scheduler.req; + +import lombok.Data; + +/** + * @author HopeLi + * @version v1.0 + * @ClassName PipelineSchedulingReq + * @Date: 2025/5/27 19:24 + * @Description: + */ +@Data +public class PipelineSchedulingReq { + + private String pipelineId; + + private String cron; + + private String status; +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/service/PipelineSchedulingPropertiesService.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/service/PipelineSchedulingPropertiesService.java deleted file mode 100644 index c27035dd..00000000 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/service/PipelineSchedulingPropertiesService.java +++ /dev/null @@ -1,27 +0,0 @@ -package cd.casic.ci.process.engine.scheduler.service; - -import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties; -import jakarta.validation.Valid; - -import java.util.List; - -/** - * @author HopeLi - * @version v1.0 - * @ClassName PipelineSchedulingPropertiesService - * @Date: 2025/5/26 16:48 - * @Description: - */ -public interface PipelineSchedulingPropertiesService { - List getAllTasks(); - - void addTask(@Valid PipelineSchedulingProperties task); - - void updateTask(@Valid PipelineSchedulingProperties task); - - void deleteTask(String id); - - void startTask(String id); - - void stopTask(String id); -} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/service/impl/PipelineSchedulingPropertiesServiceImpl.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/service/impl/PipelineSchedulingPropertiesServiceImpl.java deleted file mode 100644 index cc96209c..00000000 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/service/impl/PipelineSchedulingPropertiesServiceImpl.java +++ /dev/null @@ -1,71 +0,0 @@ -package cd.casic.ci.process.engine.scheduler.service.impl; - -import cd.casic.ci.process.engine.enums.ContextStateEnum; -import cd.casic.ci.process.engine.scheduler.config.PipelineSchedulerConfig; -import cd.casic.ci.process.engine.scheduler.dao.PipelineSchedulingPropertiesDao; -import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties; -import cd.casic.ci.process.engine.scheduler.service.PipelineSchedulingPropertiesService; -import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; - -import java.util.List; - -/** - * @author HopeLi - * @version v1.0 - * @ClassName PipelineSchedulingPropertiesServiceImpl - * @Date: 2025/5/26 16:49 - * @Description: - */ - -@Service -@Slf4j -public class PipelineSchedulingPropertiesServiceImpl extends ServiceImpl implements PipelineSchedulingPropertiesService { - - @Resource - private PipelineSchedulingPropertiesDao taskRepository; - - @Resource - private PipelineSchedulerConfig pipelineSchedulerConfig; - - public List getAllTasks() { - return taskRepository.selectList(null); - } - - public void addTask(PipelineSchedulingProperties task) { - if (ContextStateEnum.RUNNING.getCode() == Integer.parseInt(task.getStatus())) { - pipelineSchedulerConfig.addTask(task.getPipelineId(), () -> { - },task.getCronExpression()); - } - taskRepository.insert(task); - } - - public void updateTask(PipelineSchedulingProperties task) { - PipelineSchedulingProperties old = taskRepository.selectById(task.getId()); - if (old != null) { - pipelineSchedulerConfig.updateTask(old.getPipelineId(), task.getCronExpression()); - } - taskRepository.updateById(task); - } - - public void deleteTask(String taskId) { - pipelineSchedulerConfig.removeTask(taskId); - taskRepository.delete(new QueryWrapper().eq("task_id", taskId)); - } - - public void startTask(String taskId) { - pipelineSchedulerConfig.startTask(taskId); - taskRepository.update(null, new UpdateWrapper() - .set("status", String.valueOf(ContextStateEnum.RUNNING.getCode())).eq("task_id", taskId)); - } - - public void stopTask(String taskId) { - pipelineSchedulerConfig.stopTask(taskId); - taskRepository.update(null, new UpdateWrapper() - .set("status", String.valueOf(ContextStateEnum.STOP.getCode())).eq("task_id", taskId)); - } -} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/trigger/SchedulingTrigger.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/trigger/SchedulingTrigger.java deleted file mode 100644 index 59496fb6..00000000 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/trigger/SchedulingTrigger.java +++ /dev/null @@ -1,19 +0,0 @@ -package cd.casic.ci.process.engine.scheduler.trigger; - -import org.springframework.context.annotation.Configuration; -import org.springframework.scheduling.annotation.Scheduled; - -/** - * @author HopeLi - * @version v1.0 - * @ClassName SchedulingTrigger - * @Date: 2025/5/27 15:05 - * @Description: - */ -@Configuration -public class SchedulingTrigger { - @Scheduled(fixedRate = 5000) - public void triggerScheduler() { - // 用于触发调度器初始化 - } -} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/pipeline/impl/PipelineServiceImpl.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/pipeline/impl/PipelineServiceImpl.java index 50d4d1d2..0ff31d43 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/pipeline/impl/PipelineServiceImpl.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/pipeline/impl/PipelineServiceImpl.java @@ -4,7 +4,6 @@ import cd.casic.ci.common.pipeline.req.pipeline.PipelineCreateReq; import cd.casic.ci.common.pipeline.req.pipeline.PipelineQueryReq; import cd.casic.ci.common.pipeline.req.pipeline.PipelineReq; import cd.casic.ci.common.pipeline.req.pipeline.PipelineUpdateReq; -import cd.casic.ci.common.pipeline.resp.context.SingletonRunContextResp; import cd.casic.ci.common.pipeline.resp.context.TreeRunContextResp; import cd.casic.ci.common.pipeline.resp.pipeline.PipelineFindResp; import cd.casic.ci.common.pipeline.resp.stage.StageResp; @@ -143,7 +142,7 @@ public class PipelineServiceImpl extends ServiceImpl i childTask11.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId())); childTask11.setTaskName("串行任务-1-1"); childTask11.setPipelineId(pipeline.getId()); - childTask11.setTaskType("test"); + childTask11.setTaskType("code"); childTask11.setTaskSort(1); childTask11.setStageId(childStage1.getId()); taskService.save(childTask11); @@ -176,31 +175,31 @@ public class PipelineServiceImpl extends ServiceImpl i childTask21.setTaskName("串行任务-2-1"); childTask21.setCreateTime(LocalDateTime.now()); childTask21.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId())); - childTask21.setTaskType("test"); + childTask21.setTaskType("TEST_CASE_GENERATION"); childTask21.setTaskSort(1); childTask21.setStageId(childStage21.getId()); taskService.save(childTask21); //------------------------------------------------------------------ - PipStage childStage22 = new PipStage(); - childStage22.setStageName("并行阶段-2-2"); - childStage22.setPipelineId(pipeline.getId()); - childStage22.setCreateTime(LocalDateTime.now()); - childStage22.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId())); - childStage22.setStageSort(2); - childStage22.setCode(false); - childStage22.setParentId(stageReq2.getId()); - stageService.save(childStage22); - - PipTask childTask22 = new PipTask(); - childTask22.setCreateTime(LocalDateTime.now()); - childTask22.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId())); - childTask22.setTaskName("串行任务-2-2"); - childTask22.setPipelineId(pipeline.getId()); - childTask22.setTaskType("test"); - childTask22.setTaskSort(2); - childTask22.setStageId(childStage22.getId()); - taskService.save(childTask22); +// PipStage childStage22 = new PipStage(); +// childStage22.setStageName("并行阶段-2-2"); +// childStage22.setPipelineId(pipeline.getId()); +// childStage22.setCreateTime(LocalDateTime.now()); +// childStage22.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId())); +// childStage22.setStageSort(2); +// childStage22.setCode(false); +// childStage22.setParentId(stageReq2.getId()); +// stageService.save(childStage22); +// +// PipTask childTask22 = new PipTask(); +// childTask22.setCreateTime(LocalDateTime.now()); +// childTask22.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId())); +// childTask22.setTaskName("串行任务-2-2"); +// childTask22.setPipelineId(pipeline.getId()); +// childTask22.setTaskType("test"); +// childTask22.setTaskSort(2); +// childTask22.setStageId(childStage22.getId()); +// taskService.save(childTask22); //创建第三个阶段 PipStage stageReq3 = new PipStage(); @@ -228,41 +227,42 @@ public class PipelineServiceImpl extends ServiceImpl i childTask31.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId())); childTask31.setTaskName("串行任务-3-1"); childTask31.setPipelineId(pipeline.getId()); - childTask31.setTaskType("test"); + childTask31.setTaskType("AFL"); childTask31.setTaskSort(1); childTask31.setStageId(childStage31.getId()); taskService.save(childTask31); //创建第四个阶段 - PipStage stageReq4 = new PipStage(); - stageReq4.setPipelineId(pipeline.getId()); - stageReq4.setStageName("阶段-4"); - stageReq4.setCreateTime(LocalDateTime.now()); - stageReq4.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId())); - stageReq4.setStageSort(4); - stageReq4.setParentId("-1"); - stageReq4.setCode(true); - stageService.save(stageReq4); - - PipStage childStage41 = new PipStage(); - childStage41.setStageName("并行任务-4-1"); - childStage41.setPipelineId(pipeline.getId()); - childStage41.setCreateTime(LocalDateTime.now()); - childStage41.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId())); - childStage41.setStageSort(1); - childStage41.setCode(false); - childStage41.setParentId(stageReq4.getId()); - stageService.save(childStage41); - - PipTask childTask41 = new PipTask(); - childTask41.setCreateTime(LocalDateTime.now()); - childTask41.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId())); - childTask41.setTaskName("串行任务-4-1"); - childTask41.setPipelineId(pipeline.getId()); - childTask41.setTaskType("test"); - childTask41.setTaskSort(1); - childTask41.setStageId(childStage41.getId()); - taskService.save(childTask41); + //TODO +// PipStage stageReq4 = new PipStage(); +// stageReq4.setPipelineId(pipeline.getId()); +// stageReq4.setStageName("阶段-4"); +// stageReq4.setCreateTime(LocalDateTime.now()); +// stageReq4.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId())); +// stageReq4.setStageSort(4); +// stageReq4.setParentId("-1"); +// stageReq4.setCode(true); +// stageService.save(stageReq4); +// +// PipStage childStage41 = new PipStage(); +// childStage41.setStageName("并行任务-4-1"); +// childStage41.setPipelineId(pipeline.getId()); +// childStage41.setCreateTime(LocalDateTime.now()); +// childStage41.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId())); +// childStage41.setStageSort(1); +// childStage41.setCode(false); +// childStage41.setParentId(stageReq4.getId()); +// stageService.save(childStage41); +// +// PipTask childTask41 = new PipTask(); +// childTask41.setCreateTime(LocalDateTime.now()); +// childTask41.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId())); +// childTask41.setTaskName("串行任务-4-1"); +// childTask41.setPipelineId(pipeline.getId()); +// childTask41.setTaskType("REPORT"); +// childTask41.setTaskSort(1); +// childTask41.setStageId(childStage41.getId()); +// taskService.save(childTask41); //TODO 创建对应的鉴权关系 //TODO 创建对应的消息分发 diff --git a/ops-server/src/main/resources/application-local.yaml b/ops-server/src/main/resources/application-local.yaml index b2fdaf29..2237153b 100644 --- a/ops-server/src/main/resources/application-local.yaml +++ b/ops-server/src/main/resources/application-local.yaml @@ -70,7 +70,7 @@ spring: # Redis 配置。Redisson 默认的配置足够使用,一般不需要进行调优 data: redis: - host: 192.168.1.120 # 地址 + host: 127.0.0.1 # 地址 port: 6379 # 端口 database: 0 # 数据库索引 # password: dev # 密码,建议生产环境开启 diff --git a/ops-server/src/main/resources/application.yaml b/ops-server/src/main/resources/application.yaml index 6eb957c3..9af903c0 100644 --- a/ops-server/src/main/resources/application.yaml +++ b/ops-server/src/main/resources/application.yaml @@ -183,3 +183,33 @@ debug: false mybatis-plus: configuration: log-impl: org.apache.ibatis.logging.stdout.StdOutImpl + + +# Quartz 配置项,对应 QuartzProperties 配置类 +spring: + quartz: + auto-startup: true # 本地开发环境,尽量不要开启 Job + scheduler-name: schedulerName # Scheduler 名字。默认为 schedulerName + job-store-type: jdbc # Job 存储器类型。默认为 memory 表示内存,可选 jdbc 使用数据库。 + wait-for-jobs-to-complete-on-shutdown: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true + properties: # 添加 Quartz Scheduler 附加属性,更多可以看 http://www.quartz-scheduler.org/documentation/2.4.0-SNAPSHOT/configuration.html 文档 + org: + quartz: + # Scheduler 相关配置 + scheduler: + instanceName: schedulerName + instanceId: AUTO # 自动生成 instance ID + # JobStore 相关配置 + jobStore: + # JobStore 实现类。可见博客:https://blog.csdn.net/weixin_42458219/article/details/122247162 + class: org.springframework.scheduling.quartz.LocalDataSourceJobStore + isClustered: true # 是集群模式 + clusterCheckinInterval: 15000 # 集群检查频率,单位:毫秒。默认为 15000,即 15 秒 + misfireThreshold: 60000 # misfire 阀值,单位:毫秒。 + # 线程池相关配置 + threadPool: + threadCount: 25 # 线程池大小。默认为 10 。 + threadPriority: 5 # 线程优先级 + class: org.quartz.simpl.SimpleThreadPool # 线程池类型 + jdbc: # 使用 JDBC 的 JobStore 的时候,JDBC 的配置 + initialize-schema: NEVER # 是否自动使用 SQL 初始化 Quartz 表结构。这里设置成 never ,我们手动创建表结构。 From c049f3b9dc1b6fbe29949b82fde905eabdd5c0ca Mon Sep 17 00:00:00 2001 From: HopeLi <1278288511@qq.com> Date: Tue, 27 May 2025 21:26:17 +0800 Subject: [PATCH 2/2] 0527 ljc --- .../ci/process/engine/scheduler/job/PipelineSchedulingJob.java | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/job/PipelineSchedulingJob.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/job/PipelineSchedulingJob.java index 01d80a81..d193202f 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/job/PipelineSchedulingJob.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/job/PipelineSchedulingJob.java @@ -16,7 +16,6 @@ import org.quartz.JobExecutionException; */ public class PipelineSchedulingJob implements Job { public static final String JOB_PARAM_PIPELINE_ID = "pipelineId"; - public static final String JOB_PARAM_EXECUTOR = "executor"; @Resource private PipelineExecutor executor;