From 895421fa8b58ae5b5f48ab4e1a3d108afd06c6e7 Mon Sep 17 00:00:00 2001 From: wangqx Date: Thu, 18 Sep 2025 16:57:57 +0800 Subject: [PATCH] =?UTF-8?q?[fix]=E4=BF=AE=E6=94=B9=E7=94=9F=E4=BA=A7?= =?UTF-8?q?=E7=8E=AF=E5=A2=83=E9=85=8D=E7=BD=AE=EF=BC=8C=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=E6=B6=88=E8=B4=B9=E8=80=85=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/application-prod.yml | 4 +- .../mq/utils/UserRocketMQConsumerManager.java | 70 +++++++++++-------- .../service/impl/VlogPullServiceImpl.java | 16 ++--- script/docker/rocketmq/prod/broker.conf | 10 +++ 4 files changed, 60 insertions(+), 40 deletions(-) create mode 100644 script/docker/rocketmq/prod/broker.conf diff --git a/ruoyi-admin/src/main/resources/application-prod.yml b/ruoyi-admin/src/main/resources/application-prod.yml index fbdf20bf8..acde6acd0 100644 --- a/ruoyi-admin/src/main/resources/application-prod.yml +++ b/ruoyi-admin/src/main/resources/application-prod.yml @@ -140,7 +140,7 @@ rocketmq: # 生产者配置 producer: # 生产者组名 - group: wzj_test + group: wzj_prod # 发送消息超时时间 send-message-timeout: 30000 # 消息最大长度 @@ -154,7 +154,7 @@ rocketmq: # 拉取消息最大数量 pull-batch-size: 10 # 消费者组 (系统模块) - group: wzj_test + group: wzj_prod # 是否启动消费者 enabled: true --- # mail 邮件发送 diff --git a/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/UserRocketMQConsumerManager.java b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/UserRocketMQConsumerManager.java index 0357fae7a..0e784f01f 100644 --- a/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/UserRocketMQConsumerManager.java +++ b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/UserRocketMQConsumerManager.java @@ -1,6 +1,7 @@ package org.dromara.common.mq.utils; import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; @@ -17,23 +18,24 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static com.baomidou.mybatisplus.extension.ddl.DdlScriptErrorHandler.PrintlnLogErrorHandler.log; + /** * 带超时自动销毁功能的用户级RocketMQ消费者管理器 * 支持按用户隔离消费者,并自动清理超时未活动的实例 */ @Component +@Slf4j public class UserRocketMQConsumerManager { // 全局NameServer地址 - private String namesrvAddr="192.168.1.65:9876"; + @Value("${rocketmq.name-server}") + private String namesrvAddr; // 超时时间(毫秒),默认30分钟 private long timeoutMillis = 30 * 60 * 1000; @@ -101,36 +103,44 @@ public class UserRocketMQConsumerManager { * @param batchSize 每次拉取数量 * @return 拉取到的消息列表 */ - public List pullUserMessages(String userId , int batchSize) - throws MQClientException, InterruptedException, MQBrokerException, RemotingException { - validateUserId(userId); - UserConsumerContext context = getUserConsumerContext(userId); - - // 更新最后活动时间 - context.lastActiveTime = System.currentTimeMillis(); - - // 获取用户的消息队列 - Set mqs = context.consumer.fetchSubscribeMessageQueues(context.topic); + public List pullUserMessages(String userId , int batchSize){ List allMessages = new ArrayList<>(); - int current=0; - // 拉取所有队列的消息 - for (MessageQueue mq : mqs) { - PullResult result = pullSingleQueueMessage(userId, context.consumer, mq , batchSize-current); - if (result.getPullStatus() == PullStatus.FOUND) { - allMessages.addAll(result.getMsgFoundList()); - // 更新用户的偏移量 - updateUserOffset(userId, mq, result.getNextBeginOffset()); - redisCache.zSetAdd(GlobalConstants.GLOBAL_OFFSET+userId,mq.getQueueId()+"",result.getNextBeginOffset()); - current+=result.getMsgFoundList().size(); - if(current==batchSize){ - break; - } - } + try{ + validateUserId(userId); + UserConsumerContext context = getUserConsumerContext(userId); + // 更新最后活动时间 + context.lastActiveTime = System.currentTimeMillis(); + + // 获取用户的消息队列 + Set mqs = context.consumer.fetchSubscribeMessageQueues(context.topic); + + int current=0; + // 拉取所有队列的消息 + for (MessageQueue mq : mqs) { + PullResult result = pullSingleQueueMessage(userId, context.consumer, mq , batchSize-current); + if (result.getPullStatus() == PullStatus.FOUND) { + allMessages.addAll(result.getMsgFoundList()); + // 更新用户的偏移量 + updateUserOffset(userId, mq, result.getNextBeginOffset()); + redisCache.zSetAdd(GlobalConstants.GLOBAL_OFFSET+userId,mq.getQueueId()+"",result.getNextBeginOffset()); + current+=result.getMsgFoundList().size(); + if(current==batchSize){ + break; + } + } + + } + return allMessages; + + }catch (Exception e){ + log.error("用户[{}]拉取消息异常{}",userId,e.getMessage()); + return Collections.emptyList(); } - return allMessages; + + } /** diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogPullServiceImpl.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogPullServiceImpl.java index 8f666a3c0..867a773c7 100644 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogPullServiceImpl.java +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogPullServiceImpl.java @@ -1,5 +1,6 @@ package com.wzj.soopin.content.service.impl; +import cn.hutool.core.collection.CollectionUtil; import com.baomidou.mybatisplus.core.metadata.IPage; import com.tencentcloudapi.common.exception.TencentCloudSDKException; import com.tencentcloudapi.vod.v20180717.VodClient; @@ -56,13 +57,12 @@ public class VlogPullServiceImpl implements IVlogPullService { messageExts = mqConsumerManager.pullUserMessages(loginUser.getUserId() + "", (int) page.getSize()); } - List ids = messageExts.stream().map(messageExt -> { - MQMessage mqMessage = JsonUtils.parseObject(messageExt.getBody(), MQMessage.class); - return mqMessage.getData().toString(); - }).collect(Collectors.toList()); - VlogBO vlogBO = new VlogBO(); - if (ids.size() > 0) { - + if(CollectionUtil.isNotEmpty(messageExts)){ + List ids = messageExts.stream().map(messageExt -> { + MQMessage mqMessage = JsonUtils.parseObject(messageExt.getBody(), MQMessage.class); + return mqMessage.getData().toString(); + }).collect(Collectors.toList()); + VlogBO vlogBO = new VlogBO(); vlogBO.setIds(ids); List vlogVOList = vlogService.getIndexVlogList(vlogBO); return page.setRecords(vlogVOList); @@ -75,7 +75,7 @@ public class VlogPullServiceImpl implements IVlogPullService { } //发出事件 //先临时取10条数据 - Page indexVlogVOPage = vlogService.getIndexVlogList(vlogBO, page); + Page indexVlogVOPage = vlogService.getIndexVlogList(null, page); //直接获取数据库数据,并要求push类推送数据 return indexVlogVOPage; } diff --git a/script/docker/rocketmq/prod/broker.conf b/script/docker/rocketmq/prod/broker.conf new file mode 100644 index 000000000..215493525 --- /dev/null +++ b/script/docker/rocketmq/prod/broker.conf @@ -0,0 +1,10 @@ +brokerClusterName = DefaultCluster +brokerName = broker-a +brokerId = 0 +# 单机模式设为异步主节点,满足基础消息收发需求 +brokerRole = ASYNC_MASTER +# 异步刷盘,平衡性能与可靠性(单机场景适用) +flushDiskType = ASYNC_FLUSH +# 关键配置:外部客户端(如Java程序)访问Broker的IP,必须是宿主机可被外部访问的IP +brokerIP1 = 43.143.227.203 +autoCreateNamespaceEnable = true