修改消息通过WebSocket发送

This commit is contained in:
曹佳豪 2025-06-20 20:13:57 +08:00
parent 52b3be7310
commit 24544a18a8
7 changed files with 309 additions and 0 deletions

View File

@ -0,0 +1,27 @@
package org.dromara.system.config;
import org.springframework.context.annotation.Configuration;
/**
* RocketMQ 配置类
*
* @author wzj
*/
@Configuration
public class RocketMQConfig {
/**
* 系统消息主题
*/
public static final String TOPIC_SYS_MSG = "TOPIC_SYS_MSG";
/**
* 系统消息消费者组
*/
public static final String CONSUMER_GROUP_SYS_MSG = "CONSUMER_GROUP_SYS_MSG";
/**
* 系统消息标签
*/
public static final String TAG_SYS_MSG = "SYS_MSG";
}

View File

@ -0,0 +1,50 @@
package org.dromara.system.config;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
/**
* RocketMQ配置类
* 确保RocketMQTemplate被正确初始化
*
* @author wzj
*/
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
@ConditionalOnClass(DefaultMQProducer.class)
@Import(RocketMQAutoConfiguration.class)
public class RocketMQConfiguration {
/**
* 确保RocketMQTemplate被正确初始化
* 这个Bean会由RocketMQAutoConfiguration自动创建
* 此处仅作为备用方案
*/
@Bean
@ConditionalOnMissingBean(RocketMQTemplate.class)
@ConditionalOnBean(DefaultMQProducer.class)
public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer,
RocketMQMessageConverter messageConverter) {
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
rocketMQTemplate.setProducer(mqProducer);
rocketMQTemplate.setMessageConverter(messageConverter.getMessageConverter());
return rocketMQTemplate;
}
@Bean
@ConditionalOnMissingBean(RocketMQMessageConverter.class)
public RocketMQMessageConverter rocketMQMessageConverter() {
return new RocketMQMessageConverter();
}
}

View File

@ -0,0 +1,105 @@
package org.dromara.system.consumer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.json.utils.JsonUtils;
import org.dromara.system.config.RocketMQConfig;
import org.dromara.system.domain.SysMessageUser;
import org.dromara.system.domain.vo.SysMessageVo;
import org.dromara.system.mapper.SysMessageUserMapper;
import org.dromara.system.websocket.MessageWebSocketServer;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* RocketMQ消息消费者
*
* @author wzj
*/
@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = RocketMQConfig.TOPIC_SYS_MSG,
consumerGroup = RocketMQConfig.CONSUMER_GROUP_SYS_MSG,
selectorExpression = RocketMQConfig.TAG_SYS_MSG
)
public class MessageRocketMQConsumer implements RocketMQListener<String> {
private final MessageWebSocketServer messageWebSocketServer;
private final SysMessageUserMapper messageUserMapper;
@Override
public void onMessage(String message) {
try {
log.info("接收到RocketMQ消息: {}", message);
// 解析消息格式格式为: {"userId": "123", "message": {...}}
MessageWrapper wrapper = JsonUtils.parseObject(message, MessageWrapper.class);
if (wrapper != null && wrapper.getUserId() != null && wrapper.getMessage() != null) {
// 将String类型的userId转换为Long类型
Long userIdLong = null;
try {
if (StringUtils.isNotBlank(wrapper.getUserId())) {
userIdLong = Long.parseLong(wrapper.getUserId());
}
} catch (NumberFormatException e) {
log.error("用户ID转换失败: {}", wrapper.getUserId(), e);
return;
}
if (userIdLong != null) {
// 保存消息用户关联记录
try {
SysMessageUser messageUser = new SysMessageUser();
messageUser.setMessageId(wrapper.getMessage().getId());
messageUser.setUserId(userIdLong);
messageUser.setIsRead(false);
messageUserMapper.insert(messageUser);
log.info("消息用户关联记录保存成功messageId: {}, userId: {}", wrapper.getMessage().getId(), userIdLong);
} catch (Exception e) {
log.error("保存消息用户关联记录失败", e);
}
// 发送WebSocket消息
messageWebSocketServer.sendMessage(userIdLong, JsonUtils.toJsonString(wrapper.getMessage()));
log.info("通过WebSocket发送消息成功userId: {}", userIdLong);
} else {
log.warn("用户ID为空或无效: {}", wrapper.getUserId());
}
} else {
log.warn("消息格式不正确: {}", message);
}
} catch (Exception e) {
log.error("处理RocketMQ消息失败", e);
}
}
/**
* 消息包装类
*/
public static class MessageWrapper {
private String userId;
private SysMessageVo message;
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public SysMessageVo getMessage() {
return message;
}
public void setMessage(SysMessageVo message) {
this.message = message;
}
}
}

View File

@ -0,0 +1,36 @@
package org.dromara.system.service;
/**
* RocketMQ服务接口
*
* @author wzj
*/
public interface IRocketMQService {
/**
* 发送消息到指定主题
*
* @param topic 主题
* @param message 消息内容
*/
void sendMessage(String topic, Object message);
/**
* 发送消息到指定主题和标签
*
* @param topic 主题
* @param tag 标签
* @param message 消息内容
*/
void sendMessage(String topic, String tag, Object message);
/**
* 发送延迟消息
*
* @param topic 主题
* @param tag 标签
* @param message 消息内容
* @param delayTimeLevel 延迟级别 (1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)
*/
void sendDelayMessage(String topic, String tag, Object message, int delayTimeLevel);
}

View File

@ -0,0 +1,88 @@
package org.dromara.system.service.impl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.dromara.common.json.utils.JsonUtils;
import org.dromara.system.service.IRocketMQService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
/**
* RocketMQ服务实现类
*
* @author wzj
*/
@Slf4j
@Service
public class RocketMQServiceImpl implements IRocketMQService {
@Autowired(required = false)
private RocketMQTemplate rocketMQTemplate;
@Override
public void sendMessage(String topic, Object message) {
if (rocketMQTemplate == null) {
log.error("RocketMQTemplate未初始化无法发送消息");
return;
}
try {
String jsonMessage = JsonUtils.toJsonString(message);
rocketMQTemplate.convertAndSend(topic, jsonMessage);
log.info("发送消息到RocketMQ成功topic: {}, message: {}", topic, jsonMessage);
} catch (Exception e) {
log.error("发送消息到RocketMQ失败topic: {}, message: {}", topic, message, e);
}
}
@Override
public void sendMessage(String topic, String tag, Object message) {
if (rocketMQTemplate == null) {
log.error("RocketMQTemplate未初始化无法发送消息");
return;
}
try {
String destination = topic + ":" + tag;
String jsonMessage = JsonUtils.toJsonString(message);
rocketMQTemplate.convertAndSend(destination, jsonMessage);
log.info("发送消息到RocketMQ成功destination: {}, message: {}", destination, jsonMessage);
} catch (Exception e) {
log.error("发送消息到RocketMQ失败topic: {}, tag: {}, message: {}", topic, tag, message, e);
}
}
@Override
public void sendDelayMessage(String topic, String tag, Object message, int delayTimeLevel) {
if (rocketMQTemplate == null) {
log.error("RocketMQTemplate未初始化无法发送延迟消息");
return;
}
try {
String destination = topic + ":" + tag;
String jsonMessage = JsonUtils.toJsonString(message);
rocketMQTemplate.asyncSend(destination, MessageBuilder.withPayload(jsonMessage).build(),
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送延迟消息到RocketMQ成功destination: {}, message: {}, result: {}",
destination, jsonMessage, sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("发送延迟消息到RocketMQ异常destination: {}, message: {}",
destination, jsonMessage, throwable);
}
}, 3000, delayTimeLevel);
} catch (Exception e) {
log.error("发送延迟消息到RocketMQ失败topic: {}, tag: {}, message: {}", topic, tag, message, e);
}
}
}

View File

@ -0,0 +1,2 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.dromara.system.config.RocketMQConfiguration