From 24544a18a8e2c2341702186cf569b5c8fcbcaacc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E4=BD=B3=E8=B1=AA?= Date: Fri, 20 Jun 2025 20:13:57 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=B6=88=E6=81=AF=E9=80=9A?= =?UTF-8?q?=E8=BF=87WebSocket=E5=8F=91=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dromara/system/config/RocketMQConfig.java | 27 +++++ .../system/config/RocketMQConfiguration.java | 50 +++++++++ .../consumer/MessageRocketMQConsumer.java | 105 ++++++++++++++++++ .../system/service/IRocketMQService.java | 36 ++++++ .../service/impl/RocketMQServiceImpl.java | 88 +++++++++++++++ .../main/resources/META-INF/spring.factories | 2 + ...ot.autoconfigure.AutoConfiguration.imports | 1 + 7 files changed, 309 insertions(+) create mode 100644 ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/config/RocketMQConfig.java create mode 100644 ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/config/RocketMQConfiguration.java create mode 100644 ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/consumer/MessageRocketMQConsumer.java create mode 100644 ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/service/IRocketMQService.java create mode 100644 ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/service/impl/RocketMQServiceImpl.java create mode 100644 ruoyi-modules/ruoyi-system/src/main/resources/META-INF/spring.factories create mode 100644 ruoyi-modules/ruoyi-system/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports diff --git a/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/config/RocketMQConfig.java b/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/config/RocketMQConfig.java new file mode 100644 index 000000000..c0b5fa45f --- /dev/null +++ b/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/config/RocketMQConfig.java @@ -0,0 +1,27 @@ +package org.dromara.system.config; + +import org.springframework.context.annotation.Configuration; + +/** + * RocketMQ 配置类 + * + * @author wzj + */ +@Configuration +public class RocketMQConfig { + + /** + * 系统消息主题 + */ + public static final String TOPIC_SYS_MSG = "TOPIC_SYS_MSG"; + + /** + * 系统消息消费者组 + */ + public static final String CONSUMER_GROUP_SYS_MSG = "CONSUMER_GROUP_SYS_MSG"; + + /** + * 系统消息标签 + */ + public static final String TAG_SYS_MSG = "SYS_MSG"; +} \ No newline at end of file diff --git a/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/config/RocketMQConfiguration.java b/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/config/RocketMQConfiguration.java new file mode 100644 index 000000000..2d30b5af9 --- /dev/null +++ b/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/config/RocketMQConfiguration.java @@ -0,0 +1,50 @@ +package org.dromara.system.config; + +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration; +import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.apache.rocketmq.spring.support.RocketMQMessageConverter; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +/** + * RocketMQ配置类 + * 确保RocketMQTemplate被正确初始化 + * + * @author wzj + */ +@Configuration +@EnableConfigurationProperties(RocketMQProperties.class) +@ConditionalOnClass(DefaultMQProducer.class) +@Import(RocketMQAutoConfiguration.class) +public class RocketMQConfiguration { + + /** + * 确保RocketMQTemplate被正确初始化 + * 这个Bean会由RocketMQAutoConfiguration自动创建 + * 此处仅作为备用方案 + */ + @Bean + @ConditionalOnMissingBean(RocketMQTemplate.class) + @ConditionalOnBean(DefaultMQProducer.class) + public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer, + RocketMQMessageConverter messageConverter) { + RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); + rocketMQTemplate.setProducer(mqProducer); + rocketMQTemplate.setMessageConverter(messageConverter.getMessageConverter()); + return rocketMQTemplate; + } + + @Bean + @ConditionalOnMissingBean(RocketMQMessageConverter.class) + public RocketMQMessageConverter rocketMQMessageConverter() { + return new RocketMQMessageConverter(); + } +} \ No newline at end of file diff --git a/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/consumer/MessageRocketMQConsumer.java b/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/consumer/MessageRocketMQConsumer.java new file mode 100644 index 000000000..33062a200 --- /dev/null +++ b/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/consumer/MessageRocketMQConsumer.java @@ -0,0 +1,105 @@ +package org.dromara.system.consumer; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; + +import org.dromara.common.core.utils.StringUtils; +import org.dromara.common.json.utils.JsonUtils; +import org.dromara.system.config.RocketMQConfig; +import org.dromara.system.domain.SysMessageUser; +import org.dromara.system.domain.vo.SysMessageVo; +import org.dromara.system.mapper.SysMessageUserMapper; +import org.dromara.system.websocket.MessageWebSocketServer; +import org.springframework.stereotype.Component; + +import java.util.Date; + +/** + * RocketMQ消息消费者 + * + * @author wzj + */ +@Slf4j +@Component +@RequiredArgsConstructor +@RocketMQMessageListener( + topic = RocketMQConfig.TOPIC_SYS_MSG, + consumerGroup = RocketMQConfig.CONSUMER_GROUP_SYS_MSG, + selectorExpression = RocketMQConfig.TAG_SYS_MSG +) +public class MessageRocketMQConsumer implements RocketMQListener { + + private final MessageWebSocketServer messageWebSocketServer; + private final SysMessageUserMapper messageUserMapper; + + @Override + public void onMessage(String message) { + try { + log.info("接收到RocketMQ消息: {}", message); + // 解析消息格式,格式为: {"userId": "123", "message": {...}} + MessageWrapper wrapper = JsonUtils.parseObject(message, MessageWrapper.class); + if (wrapper != null && wrapper.getUserId() != null && wrapper.getMessage() != null) { + // 将String类型的userId转换为Long类型 + Long userIdLong = null; + try { + if (StringUtils.isNotBlank(wrapper.getUserId())) { + userIdLong = Long.parseLong(wrapper.getUserId()); + } + } catch (NumberFormatException e) { + log.error("用户ID转换失败: {}", wrapper.getUserId(), e); + return; + } + + if (userIdLong != null) { + // 保存消息用户关联记录 + try { + SysMessageUser messageUser = new SysMessageUser(); + messageUser.setMessageId(wrapper.getMessage().getId()); + messageUser.setUserId(userIdLong); + messageUser.setIsRead(false); + messageUserMapper.insert(messageUser); + log.info("消息用户关联记录保存成功,messageId: {}, userId: {}", wrapper.getMessage().getId(), userIdLong); + } catch (Exception e) { + log.error("保存消息用户关联记录失败", e); + } + + // 发送WebSocket消息 + messageWebSocketServer.sendMessage(userIdLong, JsonUtils.toJsonString(wrapper.getMessage())); + log.info("通过WebSocket发送消息成功,userId: {}", userIdLong); + } else { + log.warn("用户ID为空或无效: {}", wrapper.getUserId()); + } + } else { + log.warn("消息格式不正确: {}", message); + } + } catch (Exception e) { + log.error("处理RocketMQ消息失败", e); + } + } + + /** + * 消息包装类 + */ + public static class MessageWrapper { + private String userId; + private SysMessageVo message; + + public String getUserId() { + return userId; + } + + public void setUserId(String userId) { + this.userId = userId; + } + + public SysMessageVo getMessage() { + return message; + } + + public void setMessage(SysMessageVo message) { + this.message = message; + } + } +} diff --git a/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/service/IRocketMQService.java b/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/service/IRocketMQService.java new file mode 100644 index 000000000..8887a6658 --- /dev/null +++ b/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/service/IRocketMQService.java @@ -0,0 +1,36 @@ +package org.dromara.system.service; + +/** + * RocketMQ服务接口 + * + * @author wzj + */ +public interface IRocketMQService { + + /** + * 发送消息到指定主题 + * + * @param topic 主题 + * @param message 消息内容 + */ + void sendMessage(String topic, Object message); + + /** + * 发送消息到指定主题和标签 + * + * @param topic 主题 + * @param tag 标签 + * @param message 消息内容 + */ + void sendMessage(String topic, String tag, Object message); + + /** + * 发送延迟消息 + * + * @param topic 主题 + * @param tag 标签 + * @param message 消息内容 + * @param delayTimeLevel 延迟级别 (1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h) + */ + void sendDelayMessage(String topic, String tag, Object message, int delayTimeLevel); +} \ No newline at end of file diff --git a/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/service/impl/RocketMQServiceImpl.java b/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/service/impl/RocketMQServiceImpl.java new file mode 100644 index 000000000..5e8049f7a --- /dev/null +++ b/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/service/impl/RocketMQServiceImpl.java @@ -0,0 +1,88 @@ +package org.dromara.system.service.impl; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.dromara.common.json.utils.JsonUtils; +import org.dromara.system.service.IRocketMQService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; + +/** + * RocketMQ服务实现类 + * + * @author wzj + */ +@Slf4j +@Service +public class RocketMQServiceImpl implements IRocketMQService { + + @Autowired(required = false) + private RocketMQTemplate rocketMQTemplate; + + @Override + public void sendMessage(String topic, Object message) { + if (rocketMQTemplate == null) { + log.error("RocketMQTemplate未初始化,无法发送消息"); + return; + } + + try { + String jsonMessage = JsonUtils.toJsonString(message); + rocketMQTemplate.convertAndSend(topic, jsonMessage); + log.info("发送消息到RocketMQ成功,topic: {}, message: {}", topic, jsonMessage); + } catch (Exception e) { + log.error("发送消息到RocketMQ失败,topic: {}, message: {}", topic, message, e); + } + } + + @Override + public void sendMessage(String topic, String tag, Object message) { + if (rocketMQTemplate == null) { + log.error("RocketMQTemplate未初始化,无法发送消息"); + return; + } + + try { + String destination = topic + ":" + tag; + String jsonMessage = JsonUtils.toJsonString(message); + rocketMQTemplate.convertAndSend(destination, jsonMessage); + log.info("发送消息到RocketMQ成功,destination: {}, message: {}", destination, jsonMessage); + } catch (Exception e) { + log.error("发送消息到RocketMQ失败,topic: {}, tag: {}, message: {}", topic, tag, message, e); + } + } + + @Override + public void sendDelayMessage(String topic, String tag, Object message, int delayTimeLevel) { + if (rocketMQTemplate == null) { + log.error("RocketMQTemplate未初始化,无法发送延迟消息"); + return; + } + + try { + String destination = topic + ":" + tag; + String jsonMessage = JsonUtils.toJsonString(message); + rocketMQTemplate.asyncSend(destination, MessageBuilder.withPayload(jsonMessage).build(), + new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("发送延迟消息到RocketMQ成功,destination: {}, message: {}, result: {}", + destination, jsonMessage, sendResult); + } + + @Override + public void onException(Throwable throwable) { + log.error("发送延迟消息到RocketMQ异常,destination: {}, message: {}", + destination, jsonMessage, throwable); + } + }, 3000, delayTimeLevel); + } catch (Exception e) { + log.error("发送延迟消息到RocketMQ失败,topic: {}, tag: {}, message: {}", topic, tag, message, e); + } + } +} + \ No newline at end of file diff --git a/ruoyi-modules/ruoyi-system/src/main/resources/META-INF/spring.factories b/ruoyi-modules/ruoyi-system/src/main/resources/META-INF/spring.factories new file mode 100644 index 000000000..a13a85b2b --- /dev/null +++ b/ruoyi-modules/ruoyi-system/src/main/resources/META-INF/spring.factories @@ -0,0 +1,2 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + org.dromara.system.config.RocketMQConfiguration \ No newline at end of file diff --git a/ruoyi-modules/ruoyi-system/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/ruoyi-modules/ruoyi-system/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 000000000..ea40cbdfa --- /dev/null +++ b/ruoyi-modules/ruoyi-system/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.dromara.system.config.RocketMQConfig \ No newline at end of file