[fix]修改生产环境配置,修改消费者配置

This commit is contained in:
wangqx 2025-09-18 16:57:57 +08:00
parent fd5f3fc527
commit 895421fa8b
4 changed files with 60 additions and 40 deletions

View File

@ -140,7 +140,7 @@ rocketmq:
# 生产者配置
producer:
# 生产者组名
group: wzj_test
group: wzj_prod
# 发送消息超时时间
send-message-timeout: 30000
# 消息最大长度
@ -154,7 +154,7 @@ rocketmq:
# 拉取消息最大数量
pull-batch-size: 10
# 消费者组 (系统模块)
group: wzj_test
group: wzj_prod
# 是否启动消费者
enabled: true
--- # mail 邮件发送

View File

@ -1,6 +1,7 @@
package org.dromara.common.mq.utils;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
@ -17,23 +18,24 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static com.baomidou.mybatisplus.extension.ddl.DdlScriptErrorHandler.PrintlnLogErrorHandler.log;
/**
* 带超时自动销毁功能的用户级RocketMQ消费者管理器
* 支持按用户隔离消费者并自动清理超时未活动的实例
*/
@Component
@Slf4j
public class UserRocketMQConsumerManager {
// 全局NameServer地址
private String namesrvAddr="192.168.1.65:9876";
@Value("${rocketmq.name-server}")
private String namesrvAddr;
// 超时时间(毫秒)默认30分钟
private long timeoutMillis = 30 * 60 * 1000;
@ -101,36 +103,44 @@ public class UserRocketMQConsumerManager {
* @param batchSize 每次拉取数量
* @return 拉取到的消息列表
*/
public List<MessageExt> pullUserMessages(String userId , int batchSize)
throws MQClientException, InterruptedException, MQBrokerException, RemotingException {
validateUserId(userId);
UserConsumerContext context = getUserConsumerContext(userId);
// 更新最后活动时间
context.lastActiveTime = System.currentTimeMillis();
// 获取用户的消息队列
Set<MessageQueue> mqs = context.consumer.fetchSubscribeMessageQueues(context.topic);
public List<MessageExt> pullUserMessages(String userId , int batchSize){
List<MessageExt> allMessages = new ArrayList<>();
int current=0;
// 拉取所有队列的消息
for (MessageQueue mq : mqs) {
PullResult result = pullSingleQueueMessage(userId, context.consumer, mq , batchSize-current);
if (result.getPullStatus() == PullStatus.FOUND) {
allMessages.addAll(result.getMsgFoundList());
// 更新用户的偏移量
updateUserOffset(userId, mq, result.getNextBeginOffset());
redisCache.zSetAdd(GlobalConstants.GLOBAL_OFFSET+userId,mq.getQueueId()+"",result.getNextBeginOffset());
current+=result.getMsgFoundList().size();
if(current==batchSize){
break;
}
}
try{
validateUserId(userId);
UserConsumerContext context = getUserConsumerContext(userId);
// 更新最后活动时间
context.lastActiveTime = System.currentTimeMillis();
// 获取用户的消息队列
Set<MessageQueue> mqs = context.consumer.fetchSubscribeMessageQueues(context.topic);
int current=0;
// 拉取所有队列的消息
for (MessageQueue mq : mqs) {
PullResult result = pullSingleQueueMessage(userId, context.consumer, mq , batchSize-current);
if (result.getPullStatus() == PullStatus.FOUND) {
allMessages.addAll(result.getMsgFoundList());
// 更新用户的偏移量
updateUserOffset(userId, mq, result.getNextBeginOffset());
redisCache.zSetAdd(GlobalConstants.GLOBAL_OFFSET+userId,mq.getQueueId()+"",result.getNextBeginOffset());
current+=result.getMsgFoundList().size();
if(current==batchSize){
break;
}
}
}
return allMessages;
}catch (Exception e){
log.error("用户[{}]拉取消息异常{}",userId,e.getMessage());
return Collections.emptyList();
}
return allMessages;
}
/**

View File

@ -1,5 +1,6 @@
package com.wzj.soopin.content.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.tencentcloudapi.common.exception.TencentCloudSDKException;
import com.tencentcloudapi.vod.v20180717.VodClient;
@ -56,13 +57,12 @@ public class VlogPullServiceImpl implements IVlogPullService {
messageExts = mqConsumerManager.pullUserMessages(loginUser.getUserId() + "", (int) page.getSize());
}
List<String> ids = messageExts.stream().map(messageExt -> {
MQMessage mqMessage = JsonUtils.parseObject(messageExt.getBody(), MQMessage.class);
return mqMessage.getData().toString();
}).collect(Collectors.toList());
VlogBO vlogBO = new VlogBO();
if (ids.size() > 0) {
if(CollectionUtil.isNotEmpty(messageExts)){
List<String> ids = messageExts.stream().map(messageExt -> {
MQMessage mqMessage = JsonUtils.parseObject(messageExt.getBody(), MQMessage.class);
return mqMessage.getData().toString();
}).collect(Collectors.toList());
VlogBO vlogBO = new VlogBO();
vlogBO.setIds(ids);
List<IndexVlogVO> vlogVOList = vlogService.getIndexVlogList(vlogBO);
return page.setRecords(vlogVOList);
@ -75,7 +75,7 @@ public class VlogPullServiceImpl implements IVlogPullService {
}
//发出事件
//先临时取10条数据
Page<IndexVlogVO> indexVlogVOPage = vlogService.getIndexVlogList(vlogBO, page);
Page<IndexVlogVO> indexVlogVOPage = vlogService.getIndexVlogList(null, page);
//直接获取数据库数据并要求push类推送数据
return indexVlogVOPage;
}

View File

@ -0,0 +1,10 @@
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
# 单机模式设为异步主节点,满足基础消息收发需求
brokerRole = ASYNC_MASTER
# 异步刷盘,平衡性能与可靠性(单机场景适用)
flushDiskType = ASYNC_FLUSH
# 关键配置外部客户端如Java程序访问Broker的IP必须是宿主机可被外部访问的IP
brokerIP1 = 43.143.227.203
autoCreateNamespaceEnable = true