diff --git a/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/config/RocketMQConfig.java b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/config/RocketMQConfig.java index ba313eb05..370f65978 100644 --- a/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/config/RocketMQConfig.java +++ b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/config/RocketMQConfig.java @@ -20,6 +20,18 @@ public class RocketMQConfig { */ public static final String TOPIC_IM_MSG = "TOPIC_IM_MSG"; + /** + * 系统消息消费者组 + */ + public static final String GROUP_IM_MSG = "GROUP_IM_MSG"; + + + /** + * 系统消息消费者组 + */ + public static final String GROUP_SYS_MSG = "CONSUMER_GROUP_SYS_MSG"; + + /** * 系统消息消费者组 */ @@ -33,9 +45,15 @@ public class RocketMQConfig { /** * 视频上传主题 */ - public static final String VLOG_UPLOAD_TOPIC = "VLOG_UPLOAD_TOPIC"; + public static final String TOPIC_VLOG_MSG = "TOPIC_VLOG_MSG"; - public static final String VLOG_UPLOAD_GROUP = "VLOG_UPLOAD_GROUP"; + /** + * 视频上传消费者组 + */ + public static final String GROUP_VLOG_MSG = "GROUP_VLOG_MSG"; - public static final String VLOG_UPLOAD_TAG= "upload"; + /** + * 视频上传标签 + */ + public static final String TAG_VLOG_MSG = "VLOG_MSG"; } diff --git a/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/MqUtil.java b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/MqUtil.java index 9b5a7ac05..cc6b4e4eb 100644 --- a/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/MqUtil.java +++ b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/MqUtil.java @@ -42,21 +42,6 @@ public class MqUtil implements ApplicationContextAware { template = applicationContext.getBean(RocketMQTemplate.class); } - public void init() throws Exception { - // 1. 创建消费者实例,指定消费者组 - DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("YOUR_PULL_CONSUMER_GROUP"); - - consumer.setNamesrvAddr("localhost:9876"); // 替换为您的NameServer地址 - consumer.subscribe("YOUR_TOPIC", "*"); - consumer.start(); - // 3. 设置从何处开始消费(首次启动时) - //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); - // 4. 订阅主题和Tag - consumer.subscribe("YOUR_TOPIC", "*"); // 使用*消费所有Tag - // 5. 启动消费者 - consumer.start(); - System.out.println("LitePullConsumer 启动成功"); - } // public static void sendMessage(String destination, MQMessage message) { diff --git a/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/UserRocketMQConsumerManager.java b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/UserRocketMQConsumerManager.java index 0e784f01f..c321c36a8 100644 --- a/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/UserRocketMQConsumerManager.java +++ b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/UserRocketMQConsumerManager.java @@ -13,6 +13,7 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; import org.dromara.common.core.constant.GlobalConstants; +import org.dromara.common.mq.config.RocketMQConfig; import org.dromara.common.redis.redis.RedisCache; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -49,8 +50,6 @@ public class UserRocketMQConsumerManager { // 定时任务执行器,用于检测超时消费者 private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); - private final String consumerGroup="user_consumer_group"; - private final String topic="MEMBER_VLOG_MSG"; @Autowired private RedisCache redisCache; @@ -149,7 +148,7 @@ public class UserRocketMQConsumerManager { private UserConsumerContext getUserConsumerContext(String userId) throws MQClientException { UserConsumerContext context = userConsumerMap.get(userId); if (context == null) { - createConsumer(userId, consumerGroup,topic); + createConsumer(userId, RocketMQConfig.GROUP_VLOG_MSG, RocketMQConfig.TOPIC_VLOG_MSG); } return userConsumerMap.get(userId); } diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/consumer/VlogUploadMessageConsumer.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/consumer/VlogUploadMessageConsumer.java index b4af18607..8947ca845 100644 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/consumer/VlogUploadMessageConsumer.java +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/consumer/VlogUploadMessageConsumer.java @@ -35,9 +35,9 @@ import java.nio.file.Path; @Component @RequiredArgsConstructor @RocketMQMessageListener( - topic = RocketMQConfig.VLOG_UPLOAD_TOPIC, - consumerGroup = RocketMQConfig.VLOG_UPLOAD_GROUP, - selectorExpression = RocketMQConfig.VLOG_UPLOAD_TAG + topic = RocketMQConfig.TOPIC_VLOG_MSG, + consumerGroup = RocketMQConfig.GROUP_VLOG_MSG, + selectorExpression = RocketMQConfig.TAG_VLOG_MSG ) public class VlogUploadMessageConsumer implements RocketMQListener { private final QcCloud qcCloud; diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogPushServiceImpl.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogPushServiceImpl.java index 8481f4103..5a8e96435 100644 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogPushServiceImpl.java +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogPushServiceImpl.java @@ -10,6 +10,7 @@ import com.wzj.soopin.content.service.VlogService; import com.wzj.soopin.member.service.IMemberService; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.dromara.common.mq.config.RocketMQConfig; import org.dromara.common.mq.domain.MQMessage; import org.dromara.common.mq.enums.MQMessageType; import org.dromara.common.mq.utils.MqUtil; @@ -81,7 +82,7 @@ public class VlogPushServiceImpl implements IVlogPushService { .source("vlog_service") .sendTime(LocalDateTime.now()) .build(); - MqUtil.sendMessage("MEMBER_VLOG_MSG",HOT_VLOG_TAG, message); + MqUtil.sendMessage(RocketMQConfig.TOPIC_VLOG_MSG,HOT_VLOG_TAG, message); }); }else{ @@ -92,14 +93,14 @@ public class VlogPushServiceImpl implements IVlogPushService { vlogPage.getRecords().stream().forEach(vlogId -> { //将数据发送的mq的热点数据队列 MQMessage message = MQMessage.builder() - .topic("MEMBER_VLOG_MSG") + .topic(RocketMQConfig.TOPIC_VLOG_MSG) .tag(tag) .messageType(MQMessageType.VLOG.name()) .data(vlogId) .source("vlog_service") .sendTime(LocalDateTime.now()) .build(); - MqUtil.sendMessage("MEMBER_VLOG_MSG"+":"+tag, message); + MqUtil.sendMessage(RocketMQConfig.TOPIC_VLOG_MSG+":"+tag, message); }); } diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogServiceImpl.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogServiceImpl.java index 5c1e2b268..a4375a2cb 100644 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogServiceImpl.java +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogServiceImpl.java @@ -139,7 +139,7 @@ public class VlogServiceImpl extends ServiceImpl implements Vl .tag("upload") .sendTime(LocalDateTime.now()) .build(); - MqUtil.sendMessage(RocketMQConfig.VLOG_UPLOAD_TOPIC + ":" + "upload", message); + MqUtil.sendMessage(RocketMQConfig.TOPIC_VLOG_MSG + ":" + RocketMQConfig.TAG_VLOG_MSG, message); } @Override diff --git a/ruoyi-modules/ruoyi-im/src/main/java/com/wzj/soopin/im/consumer/MessageRocketMQConsumer.java b/ruoyi-modules/ruoyi-im/src/main/java/com/wzj/soopin/im/consumer/MessageRocketMQConsumer.java index cbfa1f133..8c151b3c8 100644 --- a/ruoyi-modules/ruoyi-im/src/main/java/com/wzj/soopin/im/consumer/MessageRocketMQConsumer.java +++ b/ruoyi-modules/ruoyi-im/src/main/java/com/wzj/soopin/im/consumer/MessageRocketMQConsumer.java @@ -21,7 +21,7 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor @RocketMQMessageListener( topic = RocketMQConfig.TOPIC_IM_MSG, - consumerGroup = RocketMQConfig.CONSUMER_GROUP_SYS_MSG, + consumerGroup = RocketMQConfig.GROUP_IM_MSG, selectorExpression = "*" // ackMode = AckMode.MANUAL )