去除掉roackmq的依赖

This commit is contained in:
mianbin 2025-04-21 20:59:05 +08:00
parent 2b67d8e81d
commit 12b18d0651
12 changed files with 32 additions and 333 deletions

View File

@ -3,6 +3,12 @@
<component name="CopilotChatHistory">
<option name="conversations">
<list>
<Conversation>
<option name="createTime" value="1745238244591" />
<option name="id" value="0196584e3cef74dbaf973b7f37310765" />
<option name="title" value="新对话 2025年4月21日 20:24:04" />
<option name="updateTime" value="1745238244591" />
</Conversation>
<Conversation>
<option name="createTime" value="1744877233212" />
<option name="id" value="019642c9a750700591a53e04460d4d28" />

14
.idea/compiler.xml generated
View File

@ -49,6 +49,20 @@
<module name="spring-boot-starter-security" />
</profile>
</annotationProcessing>
<bytecodeTargetLevel>
<module name="module-ci-common-pipeline" target="17" />
<module name="module-ci-dispatch-api" target="17" />
<module name="module-ci-environment" target="17" />
<module name="module-ci-event" target="17" />
<module name="module-ci-log" target="17" />
<module name="module-ci-market" target="17" />
<module name="module-ci-project" target="17" />
<module name="module-ci-quality" target="17" />
<module name="module-ci-repository" target="17" />
<module name="module-ci-store-api" target="17" />
<module name="module-ci-ticket" target="17" />
<module name="module-ci-worker" target="17" />
</bytecodeTargetLevel>
</component>
<component name="JavacSettings">
<option name="ADDITIONAL_OPTIONS_OVERRIDE">

2
.idea/misc.xml generated
View File

@ -50,6 +50,8 @@
<option value="$PROJECT_DIR$/modules/module-ci-commons/ci-common-public/pom.xml" />
<option value="$PROJECT_DIR$/modules/module-ci-commons/ci-common-pipeline/pom.xml" />
<option value="$PROJECT_DIR$/modules/module-ci-store-api/pom.xml" />
<option value="$PROJECT_DIR$/modules/module-ci-process-biz/pom.xml" />
<option value="$PROJECT_DIR$/modules/module-ci-process-api/pom.xml" />
</list>
</option>
<option name="ignoredFiles">

View File

@ -22,7 +22,6 @@
<json.version>20250107</json.version>
<kingbase.jdbc.version>8.6.0</kingbase.jdbc.version>
<commons-compress.version>1.27.1</commons-compress.version>
<rocketmq-spring.version>2.3.1</rocketmq-spring.version>
<ip2region.version>2.7.0</ip2region.version>
<dynamic-datasource.version>4.3.1</dynamic-datasource.version>
<redisson.version>3.36.0</redisson.version>
@ -35,7 +34,6 @@
<version-number.version>1.12</version-number.version>
<lock4j.version>2.2.7</lock4j.version>
<commons-io.version>2.17.0</commons-io.version>
<logback-core.version>1.2.11</logback-core.version>
<apk-parser.version>2.6.10</apk-parser.version>
<hutool-6.version>6.0.0-M16</hutool-6.version>
<resilience4j-circuitbreaker.version>2.3.0</resilience4j-circuitbreaker.version>
@ -45,7 +43,6 @@
<lombok.version>1.18.34</lombok.version>
<skywalking.version>9.0.0</skywalking.version>
<mockito-inline.version>5.2.0</mockito-inline.version>
<logback-classic.version>1.2.11</logback-classic.version>
<commons-exec.version>1.4.0</commons-exec.version>
<velocity.version>2.4</velocity.version>
<reflections.version>0.10.2</reflections.version>
@ -190,6 +187,16 @@
<artifactId>module-system-biz</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>cd.casic.boot</groupId>
<artifactId>module-ci-process-api</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>cd.casic.boot</groupId>
<artifactId>module-ci-process-biz</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>io.github.mouzt</groupId>
<artifactId>bizlog-sdk</artifactId>
@ -560,16 +567,6 @@
<artifactId>apk-parser</artifactId>
<version>${apk-parser.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback-core.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback-classic.version}</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>

View File

@ -1,46 +0,0 @@
package cd.casic.framework.tenant.core.mq.rocketmq;
import cd.casic.framework.tenant.core.context.TenantContextHolder;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import java.util.List;
import static cd.casic.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
/**
* RocketMQ 消息队列的多租户 {@link ConsumeMessageHook} 实现类
*
* Consumer 消费消息时将消息的 Header 的租户编号添加到 {@link TenantContextHolder} 通过 {@link InvocableHandlerMethod} 实现
*
* @author mianbin modified from yudao
*/
public class TenantRocketMQConsumeMessageHook implements ConsumeMessageHook {
@Override
public String hookName() {
return getClass().getSimpleName();
}
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
// 校验消息必须是单条不然设置租户可能不正确
List<MessageExt> messages = context.getMsgList();
Assert.isTrue(messages.size() == 1, "消息条数({})不正确", messages.size());
// 设置租户编号
String tenantId = messages.get(0).getUserProperty(HEADER_TENANT_ID);
if (StrUtil.isNotEmpty(tenantId)) {
TenantContextHolder.setTenantId(Long.parseLong(tenantId));
}
}
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
TenantContextHolder.clear();
}
}

View File

@ -1,53 +0,0 @@
package cd.casic.framework.tenant.core.mq.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
/**
* 多租户的 RocketMQ 初始化器
*
* @author mianbin modified from yudao
*/
public class TenantRocketMQInitializer implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DefaultRocketMQListenerContainer) {
DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
initTenantConsumer(container.getConsumer());
} else if (bean instanceof RocketMQTemplate) {
RocketMQTemplate template = (RocketMQTemplate) bean;
initTenantProducer(template.getProducer());
}
return bean;
}
private void initTenantProducer(DefaultMQProducer producer) {
if (producer == null) {
return;
}
DefaultMQProducerImpl producerImpl = producer.getDefaultMQProducerImpl();
if (producerImpl == null) {
return;
}
producerImpl.registerSendMessageHook(new TenantRocketMQSendMessageHook());
}
private void initTenantConsumer(DefaultMQPushConsumer consumer) {
if (consumer == null) {
return;
}
DefaultMQPushConsumerImpl consumerImpl = consumer.getDefaultMQPushConsumerImpl();
if (consumerImpl == null) {
return;
}
consumerImpl.registerConsumeMessageHook(new TenantRocketMQConsumeMessageHook());
}
}

View File

@ -1,36 +0,0 @@
package cd.casic.framework.tenant.core.mq.rocketmq;
import cd.casic.framework.tenant.core.context.TenantContextHolder;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import static cd.casic.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
/**
* RocketMQ 消息队列的多租户 {@link SendMessageHook} 实现类
*
* Producer 发送消息时 {@link TenantContextHolder} 租户编号添加到消息的 Header
*
* @author mianbin modified from yudao
*/
public class TenantRocketMQSendMessageHook implements SendMessageHook {
@Override
public String hookName() {
return getClass().getSimpleName();
}
@Override
public void sendMessageBefore(SendMessageContext sendMessageContext) {
Long tenantId = TenantContextHolder.getTenantId();
if (tenantId == null) {
return;
}
sendMessageContext.getMessage().putUserProperty(HEADER_TENANT_ID, tenantId.toString());
}
@Override
public void sendMessageAfter(SendMessageContext sendMessageContext) {
}
}

View File

@ -10,19 +10,11 @@ import cd.casic.framework.websocket.core.security.WebSocketAuthorizeRequestsCust
import cd.casic.framework.websocket.core.sender.kafka.KafkaWebSocketMessageConsumer;
import cd.casic.framework.websocket.core.sender.kafka.KafkaWebSocketMessageSender;
import cd.casic.framework.websocket.core.sender.local.LocalWebSocketMessageSender;
import cd.casic.framework.websocket.core.sender.rabbitmq.RabbitMQWebSocketMessageConsumer;
import cd.casic.framework.websocket.core.sender.rabbitmq.RabbitMQWebSocketMessageSender;
import cd.casic.framework.websocket.core.sender.redis.RedisWebSocketMessageConsumer;
import cd.casic.framework.websocket.core.sender.redis.RedisWebSocketMessageSender;
import cd.casic.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageConsumer;
import cd.casic.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageSender;
import cd.casic.framework.websocket.core.session.WebSocketSessionHandlerDecorator;
import cd.casic.framework.websocket.core.session.WebSocketSessionManager;
import cd.casic.framework.websocket.core.session.WebSocketSessionManagerImpl;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@ -115,54 +107,6 @@ public class OpsWebSocketAutoConfiguration {
}
@Configuration
@ConditionalOnProperty(prefix = "ops.websocket", name = "sender-type", havingValue = "rocketmq")
public class RocketMQWebSocketMessageSenderConfiguration {
@Bean
public RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender(
WebSocketSessionManager sessionManager, RocketMQTemplate rocketMQTemplate,
@Value("${ops.websocket.sender-rocketmq.topic}") String topic) {
return new RocketMQWebSocketMessageSender(sessionManager, rocketMQTemplate, topic);
}
@Bean
public RocketMQWebSocketMessageConsumer rocketMQWebSocketMessageConsumer(
RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender) {
return new RocketMQWebSocketMessageConsumer(rocketMQWebSocketMessageSender);
}
}
@Configuration
@ConditionalOnProperty(prefix = "ops.websocket", name = "sender-type", havingValue = "rabbitmq")
public class RabbitMQWebSocketMessageSenderConfiguration {
@Bean
public RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender(
WebSocketSessionManager sessionManager, RabbitTemplate rabbitTemplate,
TopicExchange websocketTopicExchange) {
return new RabbitMQWebSocketMessageSender(sessionManager, rabbitTemplate, websocketTopicExchange);
}
@Bean
public RabbitMQWebSocketMessageConsumer rabbitMQWebSocketMessageConsumer(
RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender) {
return new RabbitMQWebSocketMessageConsumer(rabbitMQWebSocketMessageSender);
}
/**
* 创建 Topic Exchange
*/
@Bean
public TopicExchange websocketTopicExchange(@Value("${ops.websocket.sender-rabbitmq.exchange}") String exchange) {
return new TopicExchange(exchange,
true, // durable: 是否持久化
false); // exclusive: 是否排它
}
}
@Configuration
@ConditionalOnProperty(prefix = "ops.websocket", name = "sender-type", havingValue = "kafka")
public class KafkaWebSocketMessageSenderConfiguration {

View File

@ -1,37 +0,0 @@
package cd.casic.framework.websocket.core.sender.rocketmq;
import lombok.Data;
import lombok.experimental.Accessors;
/**
* RocketMQ 广播 WebSocket 的消息
*
* @author mianbin modified from yudao
*/
@Data
@Accessors(chain = true)
public class RocketMQWebSocketMessage {
/**
* Session 编号
*/
private String sessionId;
/**
* 用户类型
*/
private Integer userType;
/**
* 用户编号
*/
private Long userId;
/**
* 消息类型
*/
private String messageType;
/**
* 消息内容
*/
private String messageContent;
}

View File

@ -1,30 +0,0 @@
package cd.casic.framework.websocket.core.sender.rocketmq;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
/**
* {@link RocketMQWebSocketMessage} 广播消息的消费者真正把消息发送出去
*
* @author mianbin modified from yudao
*/
@RocketMQMessageListener( // 重点添加 @RocketMQMessageListener 注解声明消费的 topic
topic = "${ops.websocket.sender-rocketmq.topic}",
consumerGroup = "${ops.websocket.sender-rocketmq.consumer-group}",
messageModel = MessageModel.BROADCASTING // 设置为广播模式保证每个实例都能收到消息
)
@RequiredArgsConstructor
public class RocketMQWebSocketMessageConsumer implements RocketMQListener<RocketMQWebSocketMessage> {
private final RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender;
@Override
public void onMessage(RocketMQWebSocketMessage message) {
rocketMQWebSocketMessageSender.send(message.getSessionId(),
message.getUserType(), message.getUserId(),
message.getMessageType(), message.getMessageContent());
}
}

View File

@ -1,61 +0,0 @@
package cd.casic.framework.websocket.core.sender.rocketmq;
import cd.casic.framework.websocket.core.sender.AbstractWebSocketMessageSender;
import cd.casic.framework.websocket.core.sender.WebSocketMessageSender;
import cd.casic.framework.websocket.core.session.WebSocketSessionManager;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
/**
* 基于 RocketMQ {@link WebSocketMessageSender} 实现类
*
* @author mianbin modified from yudao
*/
@Slf4j
public class RocketMQWebSocketMessageSender extends AbstractWebSocketMessageSender {
private final RocketMQTemplate rocketMQTemplate;
private final String topic;
public RocketMQWebSocketMessageSender(WebSocketSessionManager sessionManager,
RocketMQTemplate rocketMQTemplate,
String topic) {
super(sessionManager);
this.rocketMQTemplate = rocketMQTemplate;
this.topic = topic;
}
@Override
public void send(Integer userType, Long userId, String messageType, String messageContent) {
sendRocketMQMessage(null, userId, userType, messageType, messageContent);
}
@Override
public void send(Integer userType, String messageType, String messageContent) {
sendRocketMQMessage(null, null, userType, messageType, messageContent);
}
@Override
public void send(String sessionId, String messageType, String messageContent) {
sendRocketMQMessage(sessionId, null, null, messageType, messageContent);
}
/**
* 通过 RocketMQ 广播消息
*
* @param sessionId Session 编号
* @param userId 用户编号
* @param userType 用户类型
* @param messageType 消息类型
* @param messageContent 消息内容
*/
private void sendRocketMQMessage(String sessionId, Long userId, Integer userType,
String messageType, String messageContent) {
RocketMQWebSocketMessage mqMessage = new RocketMQWebSocketMessage()
.setSessionId(sessionId).setUserId(userId).setUserType(userType)
.setMessageType(messageType).setMessageContent(messageContent);
rocketMQTemplate.syncSend(topic, mqMessage);
}
}

View File

@ -23,7 +23,6 @@
<module>modules/module-ci-store</module>
<module>modules/module-ci-process-api</module>
<module>modules/module-ci-process-biz</module>
<module>modules/module-ci-process-biz</module>
</modules>
<description>