[fix]修改视频拉取

This commit is contained in:
王庆祥 2025-09-30 22:38:13 +08:00
parent b7043ebe17
commit e18f4ced63
7 changed files with 32 additions and 29 deletions

View File

@ -20,6 +20,18 @@ public class RocketMQConfig {
*/
public static final String TOPIC_IM_MSG = "TOPIC_IM_MSG";
/**
* 系统消息消费者组
*/
public static final String GROUP_IM_MSG = "GROUP_IM_MSG";
/**
* 系统消息消费者组
*/
public static final String GROUP_SYS_MSG = "CONSUMER_GROUP_SYS_MSG";
/**
* 系统消息消费者组
*/
@ -33,9 +45,15 @@ public class RocketMQConfig {
/**
* 视频上传主题
*/
public static final String VLOG_UPLOAD_TOPIC = "VLOG_UPLOAD_TOPIC";
public static final String TOPIC_VLOG_MSG = "TOPIC_VLOG_MSG";
public static final String VLOG_UPLOAD_GROUP = "VLOG_UPLOAD_GROUP";
/**
* 视频上传消费者组
*/
public static final String GROUP_VLOG_MSG = "GROUP_VLOG_MSG";
public static final String VLOG_UPLOAD_TAG= "upload";
/**
* 视频上传标签
*/
public static final String TAG_VLOG_MSG = "VLOG_MSG";
}

View File

@ -42,21 +42,6 @@ public class MqUtil implements ApplicationContextAware {
template = applicationContext.getBean(RocketMQTemplate.class);
}
public void init() throws Exception {
// 1. 创建消费者实例指定消费者组
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("YOUR_PULL_CONSUMER_GROUP");
consumer.setNamesrvAddr("localhost:9876"); // 替换为您的NameServer地址
consumer.subscribe("YOUR_TOPIC", "*");
consumer.start();
// 3. 设置从何处开始消费首次启动时
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 4. 订阅主题和Tag
consumer.subscribe("YOUR_TOPIC", "*"); // 使用*消费所有Tag
// 5. 启动消费者
consumer.start();
System.out.println("LitePullConsumer 启动成功");
}
//
public static void sendMessage(String destination, MQMessage message) {

View File

@ -13,6 +13,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.dromara.common.core.constant.GlobalConstants;
import org.dromara.common.mq.config.RocketMQConfig;
import org.dromara.common.redis.redis.RedisCache;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@ -49,8 +50,6 @@ public class UserRocketMQConsumerManager {
// 定时任务执行器用于检测超时消费者
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final String consumerGroup="user_consumer_group";
private final String topic="MEMBER_VLOG_MSG";
@Autowired
private RedisCache redisCache;
@ -149,7 +148,7 @@ public class UserRocketMQConsumerManager {
private UserConsumerContext getUserConsumerContext(String userId) throws MQClientException {
UserConsumerContext context = userConsumerMap.get(userId);
if (context == null) {
createConsumer(userId, consumerGroup,topic);
createConsumer(userId, RocketMQConfig.GROUP_VLOG_MSG, RocketMQConfig.TOPIC_VLOG_MSG);
}
return userConsumerMap.get(userId);
}

View File

@ -35,9 +35,9 @@ import java.nio.file.Path;
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = RocketMQConfig.VLOG_UPLOAD_TOPIC,
consumerGroup = RocketMQConfig.VLOG_UPLOAD_GROUP,
selectorExpression = RocketMQConfig.VLOG_UPLOAD_TAG
topic = RocketMQConfig.TOPIC_VLOG_MSG,
consumerGroup = RocketMQConfig.GROUP_VLOG_MSG,
selectorExpression = RocketMQConfig.TAG_VLOG_MSG
)
public class VlogUploadMessageConsumer implements RocketMQListener<MessageExt> {
private final QcCloud qcCloud;

View File

@ -10,6 +10,7 @@ import com.wzj.soopin.content.service.VlogService;
import com.wzj.soopin.member.service.IMemberService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.mq.config.RocketMQConfig;
import org.dromara.common.mq.domain.MQMessage;
import org.dromara.common.mq.enums.MQMessageType;
import org.dromara.common.mq.utils.MqUtil;
@ -81,7 +82,7 @@ public class VlogPushServiceImpl implements IVlogPushService {
.source("vlog_service")
.sendTime(LocalDateTime.now())
.build();
MqUtil.sendMessage("MEMBER_VLOG_MSG",HOT_VLOG_TAG, message);
MqUtil.sendMessage(RocketMQConfig.TOPIC_VLOG_MSG,HOT_VLOG_TAG, message);
});
}else{
@ -92,14 +93,14 @@ public class VlogPushServiceImpl implements IVlogPushService {
vlogPage.getRecords().stream().forEach(vlogId -> {
//将数据发送的mq的热点数据队列
MQMessage message = MQMessage.builder()
.topic("MEMBER_VLOG_MSG")
.topic(RocketMQConfig.TOPIC_VLOG_MSG)
.tag(tag)
.messageType(MQMessageType.VLOG.name())
.data(vlogId)
.source("vlog_service")
.sendTime(LocalDateTime.now())
.build();
MqUtil.sendMessage("MEMBER_VLOG_MSG"+":"+tag, message);
MqUtil.sendMessage(RocketMQConfig.TOPIC_VLOG_MSG+":"+tag, message);
});
}

View File

@ -139,7 +139,7 @@ public class VlogServiceImpl extends ServiceImpl<VlogMapper, Vlog> implements Vl
.tag("upload")
.sendTime(LocalDateTime.now())
.build();
MqUtil.sendMessage(RocketMQConfig.VLOG_UPLOAD_TOPIC + ":" + "upload", message);
MqUtil.sendMessage(RocketMQConfig.TOPIC_VLOG_MSG + ":" + RocketMQConfig.TAG_VLOG_MSG, message);
}
@Override

View File

@ -21,7 +21,7 @@ import org.springframework.stereotype.Component;
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = RocketMQConfig.TOPIC_IM_MSG,
consumerGroup = RocketMQConfig.CONSUMER_GROUP_SYS_MSG,
consumerGroup = RocketMQConfig.GROUP_IM_MSG,
selectorExpression = "*"
// ackMode = AckMode.MANUAL
)