diff --git a/ruoyi-admin/src/main/resources/application-dev.yml b/ruoyi-admin/src/main/resources/application-dev.yml index f2a1f775d..330d0286a 100644 --- a/ruoyi-admin/src/main/resources/application-dev.yml +++ b/ruoyi-admin/src/main/resources/application-dev.yml @@ -11,26 +11,6 @@ spring.boot.admin.client: username: @monitor.username@ password: @monitor.password@ ---- # 禁用RabbitMQ自动配置 -spring: - rabbitmq: - enabled: true - host: 192.168.1.65 # RabbitMQ服务器地址,本地为 localhost - port: 5672 # 端口,默认 5672 - username: wzj # 用户名,默认 guest - password: wzj123456 # 密码,默认 guest - virtual-host: /wzj # 虚拟主机,默认 / - # 可选:生产者确认机制,确保消息正确发送到Broker - publisher-confirm-type: correlated - # 可选:开启生产者回退机制,确保消息正确路由到队列 - publisher-returns: true - template: - mandatory: true - listener: - simple: - acknowledge-mode: manual # 手动ACK确认,通常更可靠。如需自动,改为 auto - prefetch: 1 # 每次预取一条消息,处理完再取下一条,避免消息积压 - --- # snail-job 配置 snail-job: enabled: true @@ -156,7 +136,7 @@ redisson: --- # RocketMQ 配置 rocketmq: # RocketMQ 服务器地址 - name-server: 82.156.121.2:9876 + name-server: 192.168.1.65:9876 # 生产者配置 producer: # 生产者组名 diff --git a/ruoyi-common/ruoyi-common-mq/pom.xml b/ruoyi-common/ruoyi-common-mq/pom.xml new file mode 100644 index 000000000..cd5f7ba10 --- /dev/null +++ b/ruoyi-common/ruoyi-common-mq/pom.xml @@ -0,0 +1,53 @@ + + + org.dromara + ruoyi-common + ${revision} + + 4.0.0 + + ruoyi-common-mq + + ruoyi-common-mq + + + + + + org.dromara + ruoyi-common-core + + + org.dromara + ruoyi-common-log + + + + + com.fasterxml.jackson.core + jackson-databind + + + + org.apache.rocketmq + rocketmq-spring-boot-starter + 2.2.3 + + + + + org.springframework + spring-messaging + + + + + com.fasterxml.jackson.core + jackson-databind + 2.15.2 + + + + + diff --git a/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/config/RocketMQConfig.java b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/config/RocketMQConfig.java index 135b64e4d..510a1655b 100644 --- a/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/config/RocketMQConfig.java +++ b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/config/RocketMQConfig.java @@ -15,6 +15,11 @@ public class RocketMQConfig { */ public static final String TOPIC_SYS_MSG = "TOPIC_SYS_MSG"; + /** + * 系统消息主题 + */ + public static final String TOPIC_IM_MSG = "TOPIC_IM_MSG"; + /** * 系统消息消费者组 */ diff --git a/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/domain/MQMessage.java b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/domain/MQMessage.java index f4112fb4c..3a47e8a0d 100644 --- a/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/domain/MQMessage.java +++ b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/domain/MQMessage.java @@ -27,7 +27,8 @@ public class MQMessage { /** * 消息类型 */ - private String messageType; + private String + messageType; /** * 消息内容 */ diff --git a/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/MqUtil.java b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/MqUtil.java index afbd1c16e..0e9c994ae 100644 --- a/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/MqUtil.java +++ b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/MqUtil.java @@ -5,6 +5,7 @@ import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.dromara.common.json.utils.JsonUtils; +import org.dromara.common.mq.config.RocketMQConfig; import org.dromara.common.mq.domain.MQMessage; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; @@ -37,6 +38,21 @@ public class MqUtil implements ApplicationContextAware { log.error("发送消息到RocketMQ失败,topic: {}, message: {}", topic, message, e); } } + public static void sendIMMessage( MQMessage message) { + if (template == null) { + log.error("RocketMQTemplate未初始化,无法发送消息"); + return; + } + + try { + String jsonMessage = JsonUtils.toJsonString(message); + template.convertAndSend(RocketMQConfig.TOPIC_IM_MSG, jsonMessage); + log.info("发送消息到RocketMQ成功,topic: {}, message: {}", RocketMQConfig.TOPIC_IM_MSG, jsonMessage); + } catch (Exception e) { + log.error("发送消息到RocketMQ失败,topic: {}, message: {}", RocketMQConfig.TOPIC_IM_MSG, message, e); + } + } + public static void sendMessage(String topic, String tag, MQMessage message) { if (template == null) { diff --git a/ruoyi-modules/ruoyi-content/pom.xml b/ruoyi-modules/ruoyi-content/pom.xml index 0f33d4429..1737babe4 100644 --- a/ruoyi-modules/ruoyi-content/pom.xml +++ b/ruoyi-modules/ruoyi-content/pom.xml @@ -151,13 +151,6 @@ 4.12.0 - - org.springframework.amqp - spring-rabbit - - - - com.tencentcloudapi diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/RabbitMQConsumer.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/RabbitMQConsumer.java deleted file mode 100644 index 9ed9d08db..000000000 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/RabbitMQConsumer.java +++ /dev/null @@ -1,71 +0,0 @@ -package com.wzj.soopin.content.controller; - - -import com.wzj.soopin.content.domain.base.RabbitMQConfig; - -import com.wzj.soopin.content.domain.mo.MessageMO; - -import com.wzj.soopin.content.enums.MessageEnum; -import com.wzj.soopin.content.service.MsgService; -import com.wzj.soopin.content.utils.JsonUtils; -import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.core.Message; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -/** - * RabbitMQ消费者 - * 已禁用,改用RocketMQ - */ -@Slf4j -@Component -public class RabbitMQConsumer { - - @Autowired - private MsgService msgService; - - // @RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG}) - public void watchQueue(String payload, Message message) { - log.info(payload); - - MessageMO messageMO = JsonUtils.jsonToPojo(payload, MessageMO.class); - - String routingKey = message.getMessageProperties().getReceivedRoutingKey(); - log.info(routingKey); - - // TODO: 下面这段代码可以优化,一个地方是参数优化,另外是枚举的判断优化 - - if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.FOLLOW_YOU.enValue)) { - msgService.createMsg(messageMO.getFromUserId(), - messageMO.getToUserId(), - MessageEnum.FOLLOW_YOU.type, - null); - } else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.LIKE_VLOG.enValue)) { - msgService.createMsg(messageMO.getFromUserId(), - messageMO.getToUserId(), - MessageEnum.FOLLOW_YOU.type, - messageMO.getMsgContent()); - } else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.COMMENT_VLOG.enValue)) { - msgService.createMsg(messageMO.getFromUserId(), - messageMO.getToUserId(), - MessageEnum.COMMENT_VLOG.type, - messageMO.getMsgContent()); - } else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.REPLY_YOU.enValue)) { - msgService.createMsg(messageMO.getFromUserId(), - messageMO.getToUserId(), - MessageEnum.REPLY_YOU.type, - messageMO.getMsgContent()); - } else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.LIKE_COMMENT.enValue)) { - msgService.createMsg(messageMO.getFromUserId(), - messageMO.getToUserId(), - MessageEnum.LIKE_COMMENT.type, - messageMO.getMsgContent()); - } else { -// GraceException.display(ResponseStatusEnum.SYSTEM_OPERATION_ERROR); - } - - } - - -} diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/domain/base/RabbitMQConfig.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/domain/base/RabbitMQConfig.java deleted file mode 100644 index 89dad8ab8..000000000 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/domain/base/RabbitMQConfig.java +++ /dev/null @@ -1,59 +0,0 @@ -package com.wzj.soopin.content.domain.base;//package com.imooc; - - -import org.springframework.amqp.core.*; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - - - - -/** - * RabbitMQ配置类 - * 已禁用,改用RocketMQ - */ -// @Configuration -public class RabbitMQConfig { - - /** - * 根据模型编写代码: - * 1. 定义交换机 - * 2. 定义队列 - * 3. 创建交换机 - * 4. 创建队列 - * 5. 队列和交换机的绑定 - */ - - public static final String EXCHANGE_MSG = "exchange_msg"; - - public static final String QUEUE_SYS_MSG = "queue_sys_msg"; - - // @Bean(EXCHANGE_MSG) - public Exchange exchange() { - return ExchangeBuilder // 构建交换机 - .topicExchange(EXCHANGE_MSG) // 使用topic类型,参考:https://www.rabbitmq.com/getstarted.html - .durable(true) // 设置持久化,重启mq后依然存在 - .build(); - } - - // @Bean(QUEUE_SYS_MSG) - public Queue queue() { - return new Queue(QUEUE_SYS_MSG); - } - - // @Bean - public Binding binding(@Qualifier(EXCHANGE_MSG) Exchange exchange, - @Qualifier(QUEUE_SYS_MSG) Queue queue) { - - return BindingBuilder - .bind(queue) - .to(exchange) - .with("sys.msg.*") // 定义路由规则(requestMapping) - .noargs(); - - // FIXME: * 和 # 分别代表什么意思? - } - - -} diff --git a/ruoyi-modules/ruoyi-im/src/main/java/com/wzj/soopin/im/consumer/IMMessageConsumer.java b/ruoyi-modules/ruoyi-im/src/main/java/com/wzj/soopin/im/consumer/IMMessageConsumer.java deleted file mode 100644 index 2ee024c40..000000000 --- a/ruoyi-modules/ruoyi-im/src/main/java/com/wzj/soopin/im/consumer/IMMessageConsumer.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.wzj.soopin.im.consumer; -import com.rabbitmq.client.Channel; -import lombok.extern.slf4j.Slf4j; -//import org.dromara.common.mq.config.properties.RabbitMQConfig; -import org.springframework.amqp.core.Message; -import org.springframework.stereotype.Component; - -import java.io.IOException; -@Slf4j -@Component -public class IMMessageConsumer { -// @RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE_NAME) - public void handleDirectMessage(String message, Channel channel, Message amqpMessage) throws IOException { - try { - log.info("接收到直连队列消息: {}", message); - // 业务处理逻辑 - channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false); - } catch (Exception e) { - log.error("消息处理失败", e); - channel.basicNack(amqpMessage.getMessageProperties().getDeliveryTag(), false, true); - } - } -} diff --git a/ruoyi-modules/ruoyi-im/src/main/java/com/wzj/soopin/im/consumer/MessageRocketMQConsumer.java b/ruoyi-modules/ruoyi-im/src/main/java/com/wzj/soopin/im/consumer/MessageRocketMQConsumer.java index dffd3a94a..cde201d26 100644 --- a/ruoyi-modules/ruoyi-im/src/main/java/com/wzj/soopin/im/consumer/MessageRocketMQConsumer.java +++ b/ruoyi-modules/ruoyi-im/src/main/java/com/wzj/soopin/im/consumer/MessageRocketMQConsumer.java @@ -2,12 +2,14 @@ package com.wzj.soopin.im.consumer; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.dromara.common.core.utils.StringUtils; import org.dromara.common.json.utils.JsonUtils; //import org.dromara.system.config.RocketMQConfig; import com.wzj.soopin.im.domain.vo.SysMessageVo; +import org.dromara.common.mq.config.RocketMQConfig; import org.dromara.system.websocket.MessageWebSocketServer; import org.springframework.stereotype.Component; @@ -19,11 +21,11 @@ import org.springframework.stereotype.Component; @Slf4j @Component @RequiredArgsConstructor -//@RocketMQMessageListener( -// topic = RocketMQConfig.TOPIC_SYS_MSG, -// consumerGroup = RocketMQConfig.CONSUMER_GROUP_SYS_MSG, -// selectorExpression = RocketMQConfig.TAG_SYS_MSG -//) +@RocketMQMessageListener( + topic = RocketMQConfig.TOPIC_IM_MSG, + consumerGroup = RocketMQConfig.CONSUMER_GROUP_SYS_MSG, + selectorExpression = RocketMQConfig.TAG_SYS_MSG +) public class MessageRocketMQConsumer implements RocketMQListener { private final MessageWebSocketServer messageWebSocketServer; diff --git a/ruoyi-modules/ruoyi-member/src/main/java/com/wzj/soopin/member/service/impl/FansServiceImpl.java b/ruoyi-modules/ruoyi-member/src/main/java/com/wzj/soopin/member/service/impl/FansServiceImpl.java index 5c6af8676..b6f48818b 100644 --- a/ruoyi-modules/ruoyi-member/src/main/java/com/wzj/soopin/member/service/impl/FansServiceImpl.java +++ b/ruoyi-modules/ruoyi-member/src/main/java/com/wzj/soopin/member/service/impl/FansServiceImpl.java @@ -75,13 +75,14 @@ public class FansServiceImpl extends ServiceImpl implements IF Map params = new HashMap<>(); params.put("followerNickname", follower.getNickname() == null ? "" : follower.getNickname()); params.put("vloggerNickname", vlogger.getNickname() == null ? "" : vlogger.getNickname()); - MQMessage message = MQMessage.builder() + params.put("action", MessageActionEnum.NEW_FOUCS.name()); + MQMessage message = MQMessage.builder() .messageType("follow") .data(params) .source("member") .build(); // 关注消息 - MqUtil.sendMessage(MessageActionEnum.NEW_FOUCS.name(), message); + MqUtil.sendIMMessage(message); } } diff --git a/script/docker/docker.txt b/script/docker/docker.txt index 4b290e939..c5be5600d 100644 --- a/script/docker/docker.txt +++ b/script/docker/docker.txt @@ -94,3 +94,31 @@ docker run -d \ --privileged \ minio/minio:RELEASE.2023-04-13T03-08-07Z \ server --address ':9000' --console-address ':9001' /data + +rocket-mq + +docker run -d --name rocketmq-namesrv --network rocketmq-single-network -p 9876:9876 -v /opt/rocketmq-single/namesrv/logs:/root/logs -v /opt/rocketmq-single/namesrv/store:/root/store --restart=always apache/rocketmq:4.9.4 sh mqnamesrv + + +docker run -d \ + --name rocketmq-broker \ + --network rocketmq-single-network \ + -p 10911:10911 \ + -p 10909:10909 \ + -v /var/local/docker/rocketmq/broker/logs:/root/logs \ + -v /var/local/docker/rocketmq/broker/store:/root/store \ + -v /var/local/docker/rocketmq/broker/conf/broker.conf:/home/rocketmq/rocketmq-4.9.4/conf/broker.conf \ + -e "NAMESRV_ADDR=rocketmq-namesrv:9876" \ + -e "ROCKETMQ_HOME=/home/rocketmq/rocketmq-4.9.4" \ + --restart=always \ + apache/rocketmq:4.9.4 \ + sh mqbroker -c /home/rocketmq/rocketmq-4.9.4/conf/broker.conf + + +docker run -d \ + --name rocketmq-console \ + --network rocketmq-single-network \ + -p 8081:8080 \ + -e "JAVA_OPTS=-Drocketmq.namesrv.addr=rocketmq-namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" \ + --restart=always \ + styletang/rocketmq-console-ng:latest diff --git a/script/docker/rocketmq/broker.conf b/script/docker/rocketmq/broker.conf new file mode 100644 index 000000000..6dc798667 --- /dev/null +++ b/script/docker/rocketmq/broker.conf @@ -0,0 +1,9 @@ +brokerClusterName = DefaultCluster +brokerName = broker-a +brokerId = 0 +# 单机模式设为异步主节点,满足基础消息收发需求 +brokerRole = ASYNC_MASTER +# 异步刷盘,平衡性能与可靠性(单机场景适用) +flushDiskType = ASYNC_FLUSH +# 关键配置:外部客户端(如Java程序)访问Broker的IP,必须是宿主机可被外部访问的IP +brokerIP1 = 192.168.1.65