[fix]修改mq为rockectmq
This commit is contained in:
parent
329d25d4da
commit
7bb559d294
@ -11,26 +11,6 @@ spring.boot.admin.client:
|
|||||||
username: @monitor.username@
|
username: @monitor.username@
|
||||||
password: @monitor.password@
|
password: @monitor.password@
|
||||||
|
|
||||||
--- # 禁用RabbitMQ自动配置
|
|
||||||
spring:
|
|
||||||
rabbitmq:
|
|
||||||
enabled: true
|
|
||||||
host: 192.168.1.65 # RabbitMQ服务器地址,本地为 localhost
|
|
||||||
port: 5672 # 端口,默认 5672
|
|
||||||
username: wzj # 用户名,默认 guest
|
|
||||||
password: wzj123456 # 密码,默认 guest
|
|
||||||
virtual-host: /wzj # 虚拟主机,默认 /
|
|
||||||
# 可选:生产者确认机制,确保消息正确发送到Broker
|
|
||||||
publisher-confirm-type: correlated
|
|
||||||
# 可选:开启生产者回退机制,确保消息正确路由到队列
|
|
||||||
publisher-returns: true
|
|
||||||
template:
|
|
||||||
mandatory: true
|
|
||||||
listener:
|
|
||||||
simple:
|
|
||||||
acknowledge-mode: manual # 手动ACK确认,通常更可靠。如需自动,改为 auto
|
|
||||||
prefetch: 1 # 每次预取一条消息,处理完再取下一条,避免消息积压
|
|
||||||
|
|
||||||
--- # snail-job 配置
|
--- # snail-job 配置
|
||||||
snail-job:
|
snail-job:
|
||||||
enabled: true
|
enabled: true
|
||||||
@ -156,7 +136,7 @@ redisson:
|
|||||||
--- # RocketMQ 配置
|
--- # RocketMQ 配置
|
||||||
rocketmq:
|
rocketmq:
|
||||||
# RocketMQ 服务器地址
|
# RocketMQ 服务器地址
|
||||||
name-server: 82.156.121.2:9876
|
name-server: 192.168.1.65:9876
|
||||||
# 生产者配置
|
# 生产者配置
|
||||||
producer:
|
producer:
|
||||||
# 生产者组名
|
# 生产者组名
|
||||||
|
53
ruoyi-common/ruoyi-common-mq/pom.xml
Normal file
53
ruoyi-common/ruoyi-common-mq/pom.xml
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||||
|
<parent>
|
||||||
|
<groupId>org.dromara</groupId>
|
||||||
|
<artifactId>ruoyi-common</artifactId>
|
||||||
|
<version>${revision}</version>
|
||||||
|
</parent>
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<artifactId>ruoyi-common-mq</artifactId>
|
||||||
|
<description>
|
||||||
|
ruoyi-common-mq
|
||||||
|
</description>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<!-- RuoYi Common Core-->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.dromara</groupId>
|
||||||
|
<artifactId>ruoyi-common-core</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.dromara</groupId>
|
||||||
|
<artifactId>ruoyi-common-log</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- Jackson for JSON -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
<artifactId>jackson-databind</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<!-- RocketMQ -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.rocketmq</groupId>
|
||||||
|
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
||||||
|
<version>2.2.3</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- Spring Messaging for RocketMQ -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework</groupId>
|
||||||
|
<artifactId>spring-messaging</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- Jackson核心依赖 -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
<artifactId>jackson-databind</artifactId>
|
||||||
|
<version>2.15.2</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
|
</dependencies>
|
||||||
|
</project>
|
@ -15,6 +15,11 @@ public class RocketMQConfig {
|
|||||||
*/
|
*/
|
||||||
public static final String TOPIC_SYS_MSG = "TOPIC_SYS_MSG";
|
public static final String TOPIC_SYS_MSG = "TOPIC_SYS_MSG";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 系统消息主题
|
||||||
|
*/
|
||||||
|
public static final String TOPIC_IM_MSG = "TOPIC_IM_MSG";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 系统消息消费者组
|
* 系统消息消费者组
|
||||||
*/
|
*/
|
||||||
|
@ -27,7 +27,8 @@ public class MQMessage {
|
|||||||
/**
|
/**
|
||||||
* 消息类型
|
* 消息类型
|
||||||
*/
|
*/
|
||||||
private String messageType;
|
private String
|
||||||
|
messageType;
|
||||||
/**
|
/**
|
||||||
* 消息内容
|
* 消息内容
|
||||||
*/
|
*/
|
||||||
|
@ -5,6 +5,7 @@ import org.apache.rocketmq.client.producer.SendCallback;
|
|||||||
import org.apache.rocketmq.client.producer.SendResult;
|
import org.apache.rocketmq.client.producer.SendResult;
|
||||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||||
import org.dromara.common.json.utils.JsonUtils;
|
import org.dromara.common.json.utils.JsonUtils;
|
||||||
|
import org.dromara.common.mq.config.RocketMQConfig;
|
||||||
import org.dromara.common.mq.domain.MQMessage;
|
import org.dromara.common.mq.domain.MQMessage;
|
||||||
import org.springframework.beans.BeansException;
|
import org.springframework.beans.BeansException;
|
||||||
import org.springframework.context.ApplicationContext;
|
import org.springframework.context.ApplicationContext;
|
||||||
@ -37,6 +38,21 @@ public class MqUtil implements ApplicationContextAware {
|
|||||||
log.error("发送消息到RocketMQ失败,topic: {}, message: {}", topic, message, e);
|
log.error("发送消息到RocketMQ失败,topic: {}, message: {}", topic, message, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
public static void sendIMMessage( MQMessage message) {
|
||||||
|
if (template == null) {
|
||||||
|
log.error("RocketMQTemplate未初始化,无法发送消息");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
String jsonMessage = JsonUtils.toJsonString(message);
|
||||||
|
template.convertAndSend(RocketMQConfig.TOPIC_IM_MSG, jsonMessage);
|
||||||
|
log.info("发送消息到RocketMQ成功,topic: {}, message: {}", RocketMQConfig.TOPIC_IM_MSG, jsonMessage);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("发送消息到RocketMQ失败,topic: {}, message: {}", RocketMQConfig.TOPIC_IM_MSG, message, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public static void sendMessage(String topic, String tag, MQMessage message) {
|
public static void sendMessage(String topic, String tag, MQMessage message) {
|
||||||
if (template == null) {
|
if (template == null) {
|
||||||
|
@ -151,13 +151,6 @@
|
|||||||
<version>4.12.0</version>
|
<version>4.12.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.springframework.amqp</groupId>
|
|
||||||
<artifactId>spring-rabbit</artifactId>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
<!-- 腾讯云SDK公共模块 -->
|
<!-- 腾讯云SDK公共模块 -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.tencentcloudapi</groupId>
|
<groupId>com.tencentcloudapi</groupId>
|
||||||
|
@ -1,71 +0,0 @@
|
|||||||
package com.wzj.soopin.content.controller;
|
|
||||||
|
|
||||||
|
|
||||||
import com.wzj.soopin.content.domain.base.RabbitMQConfig;
|
|
||||||
|
|
||||||
import com.wzj.soopin.content.domain.mo.MessageMO;
|
|
||||||
|
|
||||||
import com.wzj.soopin.content.enums.MessageEnum;
|
|
||||||
import com.wzj.soopin.content.service.MsgService;
|
|
||||||
import com.wzj.soopin.content.utils.JsonUtils;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.amqp.core.Message;
|
|
||||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* RabbitMQ消费者
|
|
||||||
* 已禁用,改用RocketMQ
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
@Component
|
|
||||||
public class RabbitMQConsumer {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private MsgService msgService;
|
|
||||||
|
|
||||||
// @RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})
|
|
||||||
public void watchQueue(String payload, Message message) {
|
|
||||||
log.info(payload);
|
|
||||||
|
|
||||||
MessageMO messageMO = JsonUtils.jsonToPojo(payload, MessageMO.class);
|
|
||||||
|
|
||||||
String routingKey = message.getMessageProperties().getReceivedRoutingKey();
|
|
||||||
log.info(routingKey);
|
|
||||||
|
|
||||||
// TODO: 下面这段代码可以优化,一个地方是参数优化,另外是枚举的判断优化
|
|
||||||
|
|
||||||
if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.FOLLOW_YOU.enValue)) {
|
|
||||||
msgService.createMsg(messageMO.getFromUserId(),
|
|
||||||
messageMO.getToUserId(),
|
|
||||||
MessageEnum.FOLLOW_YOU.type,
|
|
||||||
null);
|
|
||||||
} else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.LIKE_VLOG.enValue)) {
|
|
||||||
msgService.createMsg(messageMO.getFromUserId(),
|
|
||||||
messageMO.getToUserId(),
|
|
||||||
MessageEnum.FOLLOW_YOU.type,
|
|
||||||
messageMO.getMsgContent());
|
|
||||||
} else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.COMMENT_VLOG.enValue)) {
|
|
||||||
msgService.createMsg(messageMO.getFromUserId(),
|
|
||||||
messageMO.getToUserId(),
|
|
||||||
MessageEnum.COMMENT_VLOG.type,
|
|
||||||
messageMO.getMsgContent());
|
|
||||||
} else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.REPLY_YOU.enValue)) {
|
|
||||||
msgService.createMsg(messageMO.getFromUserId(),
|
|
||||||
messageMO.getToUserId(),
|
|
||||||
MessageEnum.REPLY_YOU.type,
|
|
||||||
messageMO.getMsgContent());
|
|
||||||
} else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.LIKE_COMMENT.enValue)) {
|
|
||||||
msgService.createMsg(messageMO.getFromUserId(),
|
|
||||||
messageMO.getToUserId(),
|
|
||||||
MessageEnum.LIKE_COMMENT.type,
|
|
||||||
messageMO.getMsgContent());
|
|
||||||
} else {
|
|
||||||
// GraceException.display(ResponseStatusEnum.SYSTEM_OPERATION_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
@ -1,59 +0,0 @@
|
|||||||
package com.wzj.soopin.content.domain.base;//package com.imooc;
|
|
||||||
|
|
||||||
|
|
||||||
import org.springframework.amqp.core.*;
|
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
|
||||||
import org.springframework.context.annotation.Configuration;
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* RabbitMQ配置类
|
|
||||||
* 已禁用,改用RocketMQ
|
|
||||||
*/
|
|
||||||
// @Configuration
|
|
||||||
public class RabbitMQConfig {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 根据模型编写代码:
|
|
||||||
* 1. 定义交换机
|
|
||||||
* 2. 定义队列
|
|
||||||
* 3. 创建交换机
|
|
||||||
* 4. 创建队列
|
|
||||||
* 5. 队列和交换机的绑定
|
|
||||||
*/
|
|
||||||
|
|
||||||
public static final String EXCHANGE_MSG = "exchange_msg";
|
|
||||||
|
|
||||||
public static final String QUEUE_SYS_MSG = "queue_sys_msg";
|
|
||||||
|
|
||||||
// @Bean(EXCHANGE_MSG)
|
|
||||||
public Exchange exchange() {
|
|
||||||
return ExchangeBuilder // 构建交换机
|
|
||||||
.topicExchange(EXCHANGE_MSG) // 使用topic类型,参考:https://www.rabbitmq.com/getstarted.html
|
|
||||||
.durable(true) // 设置持久化,重启mq后依然存在
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
// @Bean(QUEUE_SYS_MSG)
|
|
||||||
public Queue queue() {
|
|
||||||
return new Queue(QUEUE_SYS_MSG);
|
|
||||||
}
|
|
||||||
|
|
||||||
// @Bean
|
|
||||||
public Binding binding(@Qualifier(EXCHANGE_MSG) Exchange exchange,
|
|
||||||
@Qualifier(QUEUE_SYS_MSG) Queue queue) {
|
|
||||||
|
|
||||||
return BindingBuilder
|
|
||||||
.bind(queue)
|
|
||||||
.to(exchange)
|
|
||||||
.with("sys.msg.*") // 定义路由规则(requestMapping)
|
|
||||||
.noargs();
|
|
||||||
|
|
||||||
// FIXME: * 和 # 分别代表什么意思?
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
@ -1,23 +0,0 @@
|
|||||||
package com.wzj.soopin.im.consumer;
|
|
||||||
import com.rabbitmq.client.Channel;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
//import org.dromara.common.mq.config.properties.RabbitMQConfig;
|
|
||||||
import org.springframework.amqp.core.Message;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
@Slf4j
|
|
||||||
@Component
|
|
||||||
public class IMMessageConsumer {
|
|
||||||
// @RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE_NAME)
|
|
||||||
public void handleDirectMessage(String message, Channel channel, Message amqpMessage) throws IOException {
|
|
||||||
try {
|
|
||||||
log.info("接收到直连队列消息: {}", message);
|
|
||||||
// 业务处理逻辑
|
|
||||||
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("消息处理失败", e);
|
|
||||||
channel.basicNack(amqpMessage.getMessageProperties().getDeliveryTag(), false, true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -2,12 +2,14 @@ package com.wzj.soopin.im.consumer;
|
|||||||
|
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
|
|
||||||
import org.dromara.common.core.utils.StringUtils;
|
import org.dromara.common.core.utils.StringUtils;
|
||||||
import org.dromara.common.json.utils.JsonUtils;
|
import org.dromara.common.json.utils.JsonUtils;
|
||||||
//import org.dromara.system.config.RocketMQConfig;
|
//import org.dromara.system.config.RocketMQConfig;
|
||||||
import com.wzj.soopin.im.domain.vo.SysMessageVo;
|
import com.wzj.soopin.im.domain.vo.SysMessageVo;
|
||||||
|
import org.dromara.common.mq.config.RocketMQConfig;
|
||||||
import org.dromara.system.websocket.MessageWebSocketServer;
|
import org.dromara.system.websocket.MessageWebSocketServer;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
@ -19,11 +21,11 @@ import org.springframework.stereotype.Component;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
//@RocketMQMessageListener(
|
@RocketMQMessageListener(
|
||||||
// topic = RocketMQConfig.TOPIC_SYS_MSG,
|
topic = RocketMQConfig.TOPIC_IM_MSG,
|
||||||
// consumerGroup = RocketMQConfig.CONSUMER_GROUP_SYS_MSG,
|
consumerGroup = RocketMQConfig.CONSUMER_GROUP_SYS_MSG,
|
||||||
// selectorExpression = RocketMQConfig.TAG_SYS_MSG
|
selectorExpression = RocketMQConfig.TAG_SYS_MSG
|
||||||
//)
|
)
|
||||||
public class MessageRocketMQConsumer implements RocketMQListener<String> {
|
public class MessageRocketMQConsumer implements RocketMQListener<String> {
|
||||||
|
|
||||||
private final MessageWebSocketServer messageWebSocketServer;
|
private final MessageWebSocketServer messageWebSocketServer;
|
||||||
|
@ -75,13 +75,14 @@ public class FansServiceImpl extends ServiceImpl<FansMapper, Fans> implements IF
|
|||||||
Map<String, Object> params = new HashMap<>();
|
Map<String, Object> params = new HashMap<>();
|
||||||
params.put("followerNickname", follower.getNickname() == null ? "" : follower.getNickname());
|
params.put("followerNickname", follower.getNickname() == null ? "" : follower.getNickname());
|
||||||
params.put("vloggerNickname", vlogger.getNickname() == null ? "" : vlogger.getNickname());
|
params.put("vloggerNickname", vlogger.getNickname() == null ? "" : vlogger.getNickname());
|
||||||
MQMessage message = MQMessage.builder()
|
params.put("action", MessageActionEnum.NEW_FOUCS.name());
|
||||||
|
MQMessage message = MQMessage.builder()
|
||||||
.messageType("follow")
|
.messageType("follow")
|
||||||
.data(params)
|
.data(params)
|
||||||
.source("member")
|
.source("member")
|
||||||
.build();
|
.build();
|
||||||
// 关注消息
|
// 关注消息
|
||||||
MqUtil.sendMessage(MessageActionEnum.NEW_FOUCS.name(), message);
|
MqUtil.sendIMMessage(message);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -94,3 +94,31 @@ docker run -d \
|
|||||||
--privileged \
|
--privileged \
|
||||||
minio/minio:RELEASE.2023-04-13T03-08-07Z \
|
minio/minio:RELEASE.2023-04-13T03-08-07Z \
|
||||||
server --address ':9000' --console-address ':9001' /data
|
server --address ':9000' --console-address ':9001' /data
|
||||||
|
|
||||||
|
rocket-mq
|
||||||
|
|
||||||
|
docker run -d --name rocketmq-namesrv --network rocketmq-single-network -p 9876:9876 -v /opt/rocketmq-single/namesrv/logs:/root/logs -v /opt/rocketmq-single/namesrv/store:/root/store --restart=always apache/rocketmq:4.9.4 sh mqnamesrv
|
||||||
|
|
||||||
|
|
||||||
|
docker run -d \
|
||||||
|
--name rocketmq-broker \
|
||||||
|
--network rocketmq-single-network \
|
||||||
|
-p 10911:10911 \
|
||||||
|
-p 10909:10909 \
|
||||||
|
-v /var/local/docker/rocketmq/broker/logs:/root/logs \
|
||||||
|
-v /var/local/docker/rocketmq/broker/store:/root/store \
|
||||||
|
-v /var/local/docker/rocketmq/broker/conf/broker.conf:/home/rocketmq/rocketmq-4.9.4/conf/broker.conf \
|
||||||
|
-e "NAMESRV_ADDR=rocketmq-namesrv:9876" \
|
||||||
|
-e "ROCKETMQ_HOME=/home/rocketmq/rocketmq-4.9.4" \
|
||||||
|
--restart=always \
|
||||||
|
apache/rocketmq:4.9.4 \
|
||||||
|
sh mqbroker -c /home/rocketmq/rocketmq-4.9.4/conf/broker.conf
|
||||||
|
|
||||||
|
|
||||||
|
docker run -d \
|
||||||
|
--name rocketmq-console \
|
||||||
|
--network rocketmq-single-network \
|
||||||
|
-p 8081:8080 \
|
||||||
|
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=rocketmq-namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" \
|
||||||
|
--restart=always \
|
||||||
|
styletang/rocketmq-console-ng:latest
|
||||||
|
9
script/docker/rocketmq/broker.conf
Normal file
9
script/docker/rocketmq/broker.conf
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
brokerClusterName = DefaultCluster
|
||||||
|
brokerName = broker-a
|
||||||
|
brokerId = 0
|
||||||
|
# 单机模式设为异步主节点,满足基础消息收发需求
|
||||||
|
brokerRole = ASYNC_MASTER
|
||||||
|
# 异步刷盘,平衡性能与可靠性(单机场景适用)
|
||||||
|
flushDiskType = ASYNC_FLUSH
|
||||||
|
# 关键配置:外部客户端(如Java程序)访问Broker的IP,必须是宿主机可被外部访问的IP
|
||||||
|
brokerIP1 = 192.168.1.65
|
Loading…
x
Reference in New Issue
Block a user