This commit is contained in:
HopeLi 2025-05-26 20:11:44 +08:00
parent 54377af6df
commit b8170bc955
7 changed files with 394 additions and 0 deletions

View File

@ -0,0 +1,45 @@
package cd.casic.ci.process.engine.scheduler;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.engine.executor.PipelineExecutor;
import cd.casic.ci.process.engine.scheduler.config.PipelineSchedulerConfig;
import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties;
import cd.casic.ci.process.engine.scheduler.service.impl.PipelineSchedulingPropertiesServiceImpl;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author HopeLi
* @version v1.0
* @ClassName PipelineSchedulingBootstrapper
* @Date: 2025/5/26 17:28
* @Description:
*/
@Component
@RequiredArgsConstructor
public class PipelineSchedulingBootstrapper {
@Resource
private PipelineSchedulingPropertiesServiceImpl taskService;
@Resource
private PipelineSchedulerConfig taskScheduler;
@Resource
private PipelineExecutor pipelineExecutor;
@PostConstruct
public void init() {
List<PipelineSchedulingProperties> tasks = taskService.getAllTasks();
for (PipelineSchedulingProperties task : tasks) {
if (ContextStateEnum.RUNNING.getCode().equals(Integer.parseInt(task.getStatus()))) {
taskScheduler.addTask(task.getPipelineId(), ()->{
pipelineExecutor.execute(task.getPipelineId());
}, task.getCronExpression());
}
}
}
}

View File

@ -0,0 +1,146 @@
package cd.casic.ci.process.engine.scheduler.config;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.CronTask;
import org.springframework.scheduling.config.ScheduledTask;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.stereotype.Component;
import java.lang.reflect.Field;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
/**
* @author HopeLi
* @version v1.0
* @ClassName PipelineSchedulerConfig
* @Date: 2025/5/26 10:55
* @Description:
*/
@Configuration
@EnableScheduling
@Slf4j
public class PipelineSchedulerConfig implements SchedulingConfigurer {
@Resource
private ScheduledTaskRegistrar taskRegistrar;
private final Map<String, ScheduledTask> taskFutures = new HashMap<>();
private final Map<String, CronTask> cronTaskMap = new HashMap<>();
@Override
public void configureTasks(@NotNull ScheduledTaskRegistrar taskRegistrar) {
this.taskRegistrar = taskRegistrar;
}
public void addTask(String taskId, Runnable task, String cronExpression) {
CronTask cronTask = new CronTask(task, cronExpression);
ScheduledTask scheduledTask = taskRegistrar.scheduleCronTask(cronTask);
taskFutures.put(taskId, scheduledTask);
cronTaskMap.put(taskId, cronTask);
log.info("任务 [{}] 已添加cron: {}", taskId, cronExpression);
}
public void removeTask(String taskId) {
ScheduledTask scheduledTask = taskFutures.get(taskId);
if (scheduledTask != null) {
ScheduledFuture<?> future = getFuture(scheduledTask);
if (future != null) {
future.cancel(true);
}
taskFutures.remove(taskId);
cronTaskMap.remove(taskId);
log.info("任务 [{}] 已删除", taskId);
}
}
public void updateTask(String taskId, String cronExpression) {
removeTask(taskId);
CronTask oldTask = cronTaskMap.get(taskId);
if (oldTask == null) {
throw new IllegalArgumentException("未找到对应的任务: " + taskId);
}
addTask(taskId, oldTask.getRunnable(), cronExpression);
log.info("任务 [{}] 已更新,新 cron: {}", taskId, cronExpression);
}
public void startTask(String taskId) {
ScheduledTask scheduledTask = taskFutures.get(taskId);
if (scheduledTask == null) {
log.warn("任务 [{}] 不存在", taskId);
return;
}
ScheduledFuture<?> future = getFuture(scheduledTask);
if (future != null && !future.isCancelled()) {
log.info("任务 [{}] 正在运行中,无需重启", taskId);
return;
}
CronTask cronTask = cronTaskMap.get(taskId);
if (cronTask == null) {
log.warn("未找到对应的任务定义: {}", taskId);
return;
}
ScheduledTask newTask = taskRegistrar.scheduleCronTask(cronTask);
taskFutures.put(taskId, newTask);
log.info("任务 [{}] 已重启", taskId);
}
/**
* 停止一个任务
*/
public void stopTask(String taskId) {
removeTask(taskId);
log.info("任务 [{}] 已停止", taskId);
}
/**
* 获取 ScheduledTask 中的 future 字段
*/
private ScheduledFuture<?> getFuture(ScheduledTask scheduledTask) {
try {
Field field = scheduledTask.getClass().getDeclaredField("future");
field.setAccessible(true);
return (ScheduledFuture<?>) field.get(scheduledTask);
} catch (Exception e) {
log.error("反射获取 future 失败: {}", e.getMessage());
return null;
}
}
/**
* 判断任务是否已取消
*/
public boolean isTaskCancelled(String taskId) {
ScheduledTask scheduledTask = taskFutures.get(taskId);
if (scheduledTask == null) return true;
ScheduledFuture<?> future = getFuture(scheduledTask);
return future != null && future.isCancelled();
}
@Component
public static class PipelineSchedulerTask implements Runnable {
@Override
public void run() {
// 定时任务逻辑代码
System.out.println("Pipeline Task: " + new Date());
}
}
}

View File

@ -0,0 +1,54 @@
package cd.casic.ci.process.engine.scheduler.controller;
import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties;
import cd.casic.ci.process.engine.scheduler.service.PipelineSchedulingPropertiesService;
import cd.casic.ci.process.process.dataObject.base.BaseIdReq;
import jakarta.annotation.Resource;
import jakarta.validation.Valid;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* @author HopeLi
* @version v1.0
* @ClassName PipelineSchedulingPropertiesController
* @Date: 2025/5/26 16:56
* @Description:
*/
@RestController
@RequestMapping("/scheduling")
public class PipelineSchedulingPropertiesController {
@Resource
private PipelineSchedulingPropertiesService pipelineSchedulingPropertiesService;
@PostMapping("/getAll")
public List<PipelineSchedulingProperties> getAll() {
return pipelineSchedulingPropertiesService.getAllTasks();
}
@PostMapping("/create")
public void create(@RequestBody @Valid PipelineSchedulingProperties task) {
pipelineSchedulingPropertiesService.addTask(task);
}
@PostMapping("/update")
public void update(@RequestBody @Valid PipelineSchedulingProperties task) {
pipelineSchedulingPropertiesService.updateTask(task);
}
@PostMapping("/delete")
public void delete(@RequestBody @Valid BaseIdReq req) {
pipelineSchedulingPropertiesService.deleteTask(req.getId());
}
@PostMapping("/start")
public void start(@RequestBody @Valid BaseIdReq req) {
pipelineSchedulingPropertiesService.startTask(req.getId());
}
@PostMapping("/stop")
public void stop(@RequestBody @Valid BaseIdReq req) {
pipelineSchedulingPropertiesService.stopTask(req.getId());
}
}

View File

@ -0,0 +1,14 @@
package cd.casic.ci.process.engine.scheduler.dao;
import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties;
import cd.casic.framework.mybatis.core.mapper.BaseMapperX;
/**
* @author HopeLi
* @version v1.0
* @ClassName PipelineSchedulingPropertiesDao
* @Date: 2025/5/26 16:31
* @Description:
*/
public interface PipelineSchedulingPropertiesDao extends BaseMapperX<PipelineSchedulingProperties> {
}

View File

@ -0,0 +1,37 @@
package cd.casic.ci.process.engine.scheduler.dateObject;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
/**
* @author HopeLi
* @version v1.0
* @ClassName PipelineSchedulingProperties
* @Date: 2025/5/26 16:27
* @Description:
*/
@Data
@TableName("pip_pipeline_scheduling_properties")
public class PipelineSchedulingProperties extends PipBaseElement {
/**
* 任务唯一标识
*/
private String pipelineId;
/**
* 任务名称
*/
private String pipelineName;
/**
* 定时表达式
*/
private String cronExpression;
/**
* 执行状态
*/
private String status;
}

View File

@ -0,0 +1,27 @@
package cd.casic.ci.process.engine.scheduler.service;
import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties;
import jakarta.validation.Valid;
import java.util.List;
/**
* @author HopeLi
* @version v1.0
* @ClassName PipelineSchedulingPropertiesService
* @Date: 2025/5/26 16:48
* @Description:
*/
public interface PipelineSchedulingPropertiesService {
List<PipelineSchedulingProperties> getAllTasks();
void addTask(@Valid PipelineSchedulingProperties task);
void updateTask(@Valid PipelineSchedulingProperties task);
void deleteTask(String id);
void startTask(String id);
void stopTask(String id);
}

View File

@ -0,0 +1,71 @@
package cd.casic.ci.process.engine.scheduler.service.impl;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.engine.scheduler.config.PipelineSchedulerConfig;
import cd.casic.ci.process.engine.scheduler.dao.PipelineSchedulingPropertiesDao;
import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties;
import cd.casic.ci.process.engine.scheduler.service.PipelineSchedulingPropertiesService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @author HopeLi
* @version v1.0
* @ClassName PipelineSchedulingPropertiesServiceImpl
* @Date: 2025/5/26 16:49
* @Description:
*/
@Service
@Slf4j
public class PipelineSchedulingPropertiesServiceImpl extends ServiceImpl<PipelineSchedulingPropertiesDao, PipelineSchedulingProperties> implements PipelineSchedulingPropertiesService {
@Resource
private PipelineSchedulingPropertiesDao taskRepository;
@Resource
private PipelineSchedulerConfig taskScheduler;
public List<PipelineSchedulingProperties> getAllTasks() {
return taskRepository.selectList(null);
}
public void addTask(PipelineSchedulingProperties task) {
if (ContextStateEnum.RUNNING.getCode().equals(Integer.parseInt(task.getStatus()))) {
taskScheduler.addTask(task.getPipelineId(), () -> {
},task.getCronExpression());
}
taskRepository.insert(task);
}
public void updateTask(PipelineSchedulingProperties task) {
PipelineSchedulingProperties old = taskRepository.selectById(task.getId());
if (old != null) {
taskScheduler.updateTask(old.getPipelineId(), task.getCronExpression());
}
taskRepository.updateById(task);
}
public void deleteTask(String taskId) {
taskScheduler.removeTask(taskId);
taskRepository.delete(new QueryWrapper<PipelineSchedulingProperties>().eq("task_id", taskId));
}
public void startTask(String taskId) {
taskScheduler.startTask(taskId);
taskRepository.update(null, new UpdateWrapper<PipelineSchedulingProperties>()
.set("status", "RUNNING").eq("task_id", taskId));
}
public void stopTask(String taskId) {
taskScheduler.stopTask(taskId);
taskRepository.update(null, new UpdateWrapper<PipelineSchedulingProperties>()
.set("status", "STOPPED").eq("task_id", taskId));
}
}