[fix]修改视频推送逻辑

This commit is contained in:
wangqx 2025-08-28 17:29:26 +08:00
parent 6c4ffb26e9
commit 50c6f60b25
9 changed files with 141 additions and 164 deletions

View File

@ -41,4 +41,10 @@ public interface GlobalConstants {
* 视频 redis key
*/
String VLOG_KEY = GLOBAL_REDIS_KEY + "vlog:";
/**
* 会员视频 redis key
*/
String GLOBAL_OFFSET = GLOBAL_REDIS_KEY + "offset:";
}

View File

@ -0,0 +1,63 @@
package org.dromara.common.mq.domain;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
/**
* 消息事件
*
* @author ruoyi
*/
@Getter
public class VlogPushEvent extends ApplicationEvent {
private final Long userId;
private final String userIdStr;
/**
* 使用 Long 类型用户 ID 创建消息事件
*/
public VlogPushEvent(Object source, Long userId) {
super(source);
this.userId = userId;
this.userIdStr = userId != null ? String.valueOf(userId) : null;
}
/**
* 使用 Long 类型用户 ID 创建消息事件
*/
public VlogPushEvent(Long userId) {
super(null);
this.userId = userId;
this.userIdStr = null;
}
/**
* 使用 String 类型用户 ID 创建消息事件
*/
public VlogPushEvent(Object source, String userIdStr) {
super(source);
this.userIdStr = userIdStr;
// 尝试转换为 Long 类型
Long parsedUserId = null;
try {
if (userIdStr != null) {
parsedUserId = Long.parseLong(userIdStr);
}
} catch (NumberFormatException e) {
// 无法解析为 Long 类型保持 userId null
}
this.userId = parsedUserId;
}
/**
* 创建 MessageEvent 实例支持 String 类型用户ID
*
* @param source 事件源
* @param userIdStr 用户ID (String类型)
* @return MessageEvent 实例
*/
public static VlogPushEvent createWithStringUserId(Object source, String userIdStr) {
return new VlogPushEvent(source, userIdStr);
}
}

View File

@ -1,5 +1,6 @@
package org.dromara.common.mq.utils;
import lombok.AllArgsConstructor;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
@ -10,6 +11,9 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.dromara.common.core.constant.GlobalConstants;
import org.dromara.common.redis.redis.RedisCache;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@ -29,9 +33,6 @@ import java.util.concurrent.TimeUnit;
@Component
public class UserRocketMQConsumerManager {
// 全局NameServer地址
@Value("rocketmq.name-server")
private String namesrvAddr="192.168.1.65:9876";
// 超时时间(毫秒)默认30分钟
@ -43,63 +44,16 @@ public class UserRocketMQConsumerManager {
// 存储用户与消费者的映射线程安全
private final Map<String, UserConsumerContext> userConsumerMap = new ConcurrentHashMap<>();
// 存储每个用户的消费偏移量用户ID -> 队列 -> 偏移量
private final Map<String, Map<MessageQueue, Long>> userOffsetMap = new ConcurrentHashMap<>();
// 定时任务执行器用于检测超时消费者
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final String consumerGroup="user_consumer_group";
private final String topic="MEMBER_VLOG_MSG";
/**
* 初始化管理器
*/
public UserRocketMQConsumerManager() {
if (namesrvAddr == null || namesrvAddr.trim().isEmpty()) {
throw new IllegalArgumentException("NameServer地址不能为空");
}
System.setProperty("rocketmq.namesrv.addr", namesrvAddr);
// 启动定时清理任务
startTimeoutChecker();
}
/**
* 启动超时检查定时任务
*/
private void startTimeoutChecker() {
scheduler.scheduleAtFixedRate(() -> {
try {
checkAndDestroyTimeoutConsumers();
} catch (Exception e) {
System.err.println("超时检查任务执行失败: " + e.getMessage());
}
}, checkIntervalMillis, checkIntervalMillis, TimeUnit.MILLISECONDS);
System.out.println("超时检查任务已启动,检查间隔: " + checkIntervalMillis + "ms超时时间: " + timeoutMillis + "ms");
}
@Autowired
private RedisCache redisCache;
/**
* 检查并销毁超时未活动的消费者
*/
private void checkAndDestroyTimeoutConsumers() {
long currentTime = System.currentTimeMillis();
List<String> timeoutUserIds = new ArrayList<>();
// 收集超时用户
for (Map.Entry<String, UserConsumerContext> entry : userConsumerMap.entrySet()) {
String userId = entry.getKey();
UserConsumerContext context = entry.getValue();
if (currentTime - context.lastActiveTime > timeoutMillis) {
timeoutUserIds.add(userId);
}
}
// 销毁超时消费者
for (String userId : timeoutUserIds) {
System.out.println("用户[" + userId + "]的消费者已超时未活动,自动销毁");
destroyConsumer(userId);
}
}
/**
* 为指定用户创建消费者实例
@ -118,15 +72,16 @@ public class UserRocketMQConsumerManager {
System.out.println("用户[" + userId + "]的消费者已存在,更新活动时间");
return false;
}
System.setProperty("rocketmq.namesrv.addr", namesrvAddr);
// 创建消费者
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerGroup);
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
consumer.start();
// 初始化该用户的偏移量存储
userOffsetMap.putIfAbsent(userId, new ConcurrentHashMap<>());
redisCache.zSetAdd(GlobalConstants.GLOBAL_OFFSET+userId,userId,0);
// 存储用户消费者上下文
UserConsumerContext context = new UserConsumerContext(
@ -158,14 +113,21 @@ public class UserRocketMQConsumerManager {
Set<MessageQueue> mqs = context.consumer.fetchSubscribeMessageQueues(context.topic);
List<MessageExt> allMessages = new ArrayList<>();
int current=0;
// 拉取所有队列的消息
for (MessageQueue mq : mqs) {
PullResult result = pullSingleQueueMessage(userId, context.consumer, mq , batchSize);
PullResult result = pullSingleQueueMessage(userId, context.consumer, mq , batchSize-current);
if (result.getPullStatus() == PullStatus.FOUND) {
allMessages.addAll(result.getMsgFoundList());
// 更新用户的偏移量
updateUserOffset(userId, mq, result.getNextBeginOffset());
redisCache.zSetAdd(GlobalConstants.GLOBAL_OFFSET+userId,mq.getQueueId()+"",result.getNextBeginOffset());
current+=result.getMsgFoundList().size();
if(current==batchSize){
break;
}
}
}
return allMessages;
@ -189,41 +151,24 @@ public class UserRocketMQConsumerManager {
MessageQueue mq, int batchSize)
throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
// 获取该用户在该队列的偏移量
long offset = getUserQueueOffset(userId, mq);
return consumer.pull(mq, userId, offset, batchSize);
Double offset = getUserQueueOffset(userId, mq);
if(offset==null){
offset=0.0;
}
return consumer.pull(mq, userId, offset.longValue(), batchSize);
}
/**
* 获取用户在指定队列的消费偏移量
*/
private long getUserQueueOffset(String userId, MessageQueue mq) {
Map<MessageQueue, Long> queueOffsetMap = userOffsetMap.get(userId);
return queueOffsetMap.getOrDefault(mq, 0L);
private Double getUserQueueOffset(String userId, MessageQueue mq) {
return redisCache.zSetScore(GlobalConstants.GLOBAL_OFFSET+userId,mq.getQueueId()+"") ;
}
/**
* 更新用户在指定队列的消费偏移量
*/
private void updateUserOffset(String userId, MessageQueue mq, long offset) {
Map<MessageQueue, Long> queueOffsetMap = userOffsetMap.get(userId);
queueOffsetMap.put(mq, offset);
}
/**
* 手动销毁指定用户的消费者
* @param userId 用户唯一标识
* @return 是否销毁成功
*/
public boolean destroyConsumer(String userId) {
validateUserId(userId);
UserConsumerContext context = userConsumerMap.remove(userId);
if (context != null) {
context.consumer.shutdown();
userOffsetMap.remove(userId);
System.out.println("用户[" + userId + "]的消费者已销毁,消费组: " + context.consumerGroup);
return true;
}
return false;
}
/**
@ -242,41 +187,6 @@ public class UserRocketMQConsumerManager {
}
}
/**
* 设置超时时间(毫秒)
*/
public void setTimeoutMillis(long timeoutMillis) {
if (timeoutMillis <= 0) {
throw new IllegalArgumentException("超时时间必须大于0");
}
this.timeoutMillis = timeoutMillis;
System.out.println("已更新超时时间: " + timeoutMillis + "ms");
}
/**
* 设置检查间隔(毫秒)
*/
public void setCheckIntervalMillis(long checkIntervalMillis) {
if (checkIntervalMillis <= 0 || checkIntervalMillis > timeoutMillis) {
throw new IllegalArgumentException("检查间隔必须大于0且小于等于超时时间");
}
this.checkIntervalMillis = checkIntervalMillis;
// 重新启动定时任务
scheduler.shutdownNow();
startTimeoutChecker();
}
/**
* 关闭管理器释放所有资源
*/
public void shutdown() {
scheduler.shutdown();
// 销毁所有消费者
for (String userId : new ArrayList<>(userConsumerMap.keySet())) {
destroyConsumer(userId);
}
System.out.println("消费者管理器已关闭");
}
/**
* 用户消费者上下文存储消费者实例及活动时间
@ -298,36 +208,5 @@ public class UserRocketMQConsumerManager {
}
}
// 测试示例
public static void main(String[] args) throws InterruptedException {
try {
// 初始化管理器设置超时时间为10秒检查间隔为5秒
UserRocketMQConsumerManager manager = new UserRocketMQConsumerManager( );
manager.setTimeoutMillis(10 * 1000);
manager.setCheckIntervalMillis(5 * 1000);
// 为用户1创建消费者
manager.createConsumer("user1", "user1_group", "user_topic");
System.out.println("当前注册用户: " + manager.getRegisteredUserIds());
// 用户1拉取消息更新活动时间
System.out.println("\n用户1首次拉取消息:");
manager.pullUserMessages("user1", 5);
// 等待6秒未超时
Thread.sleep(6000);
System.out.println("\n6秒后注册用户: " + manager.getRegisteredUserIds());
// 再等待5秒累计11秒超过10秒超时
Thread.sleep(5000);
System.out.println("\n11秒后注册用户: " + manager.getRegisteredUserIds());
// 关闭管理器
manager.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -315,6 +315,7 @@ public class RedisCache
redisTemplate.opsForZSet().incrementScore(key, value, -delta);
}
/**
* 是否有这个成员
* @param key

View File

@ -0,0 +1,20 @@
package com.wzj.soopin.content.event;
import com.wzj.soopin.content.service.IVlogPushService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.mq.domain.VlogPushEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class VlogPushEventListener {
private final IVlogPushService vlogPushService;
@EventListener
public void handleNeedPushVlogEvent(VlogPushEvent event) {
log.info("收到需要推送的视频事件用户ID{}", event.getUserId());
vlogPushService.pushVlogToMember(100, event.getUserId());
}
}

View File

@ -140,5 +140,6 @@ public interface VlogService extends IService<Vlog> {
int readVlog(Long memberId, String vlogId);
List<IndexVlogVO> getIndexVlogList(Map<String, Object> paramMap);
Page<IndexVlogVO> getIndexVlogList(Map<String, Object> paramMap,Page page);
}

View File

@ -1,6 +1,5 @@
package com.wzj.soopin.content.service.impl;
import com.google.gson.JsonObject;
import com.wzj.soopin.content.service.IVlogPullService;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@ -10,12 +9,12 @@ import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.dromara.common.core.domain.model.LoginUser;
import org.dromara.common.core.exception.ServiceException;
import org.dromara.common.json.utils.JsonUtils;
import org.dromara.common.mq.domain.MQMessage;
import org.dromara.common.mq.domain.VlogPushEvent;
import org.dromara.common.mq.utils.UserRocketMQConsumerManager;
import org.dromara.common.satoken.utils.LoginHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
@ -31,6 +30,7 @@ public class VlogPullServiceImpl implements IVlogPullService {
private final UserRocketMQConsumerManager mqConsumerManager;
private final VlogService vlogService;
private final ApplicationEventPublisher eventPublisher;
@Override
public Page<IndexVlogVO> page(Page<IndexVlogVO> page) {
@ -45,18 +45,24 @@ public class VlogPullServiceImpl implements IVlogPullService {
messageExts = mqConsumerManager.pullUserMessages(loginUser.getUserId() + "", (int) page.getSize());
}
List<String> ids=messageExts.stream().map(messageExt -> {
List<String> ids = messageExts.stream().map(messageExt -> {
MQMessage mqMessage = JsonUtils.parseObject(messageExt.getBody(), MQMessage.class);
return mqMessage.getData().toString();
}).collect(Collectors.toList());
Map<String, Object> paramMap = new HashMap<>();
if (ids.size() > 0) {
if(ids.size()>0){
Map<String, Object> paramMap = new HashMap<>();
paramMap.put("ids", ids);
List<IndexVlogVO> vlogVOList = vlogService.getIndexVlogList(paramMap);
return page.setRecords(vlogVOList);
}else{
return page;
} else {
//发出事件
eventPublisher.publishEvent(new VlogPushEvent(loginUser.getUserId()));
//先临时取10条数据
Page<IndexVlogVO> indexVlogVOPage = vlogService.getIndexVlogList(paramMap, page);
//直接获取数据库数据并要求push类推送数据
return indexVlogVOPage;
}
} catch (Exception e) {

View File

@ -4,32 +4,26 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wzj.soopin.content.domain.po.Vlog;
import com.wzj.soopin.content.domain.vo.IndexVlogVO;
import com.wzj.soopin.content.domain.vo.VlogerVO;
import com.wzj.soopin.content.service.IVlogPushService;
import com.wzj.soopin.content.service.VlogService;
import com.wzj.soopin.member.domain.po.Member;
import com.wzj.soopin.member.domain.vo.MemberVO;
import com.wzj.soopin.member.service.IMemberService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.dromara.common.core.utils.MapstructUtils;
import org.dromara.common.mq.domain.MQMessage;
import org.dromara.common.mq.utils.MqUtil;
import org.dromara.common.redis.redis.RedisCache;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static com.wzj.soopin.content.domain.base.BaseInfoProperties.*;
import static com.wzj.soopin.content.domain.base.BaseInfoProperties.REDIS_VLOG_BE_LIKED_COUNTS;
@Service
@AllArgsConstructor

View File

@ -618,6 +618,13 @@ public class VlogServiceImpl extends ServiceImpl<VlogMapper, Vlog> implements Vl
return indexVlogVOPage;
}
@Override
public Page<IndexVlogVO> getIndexVlogList(Map<String, Object> paramMap, Page page) {
Page<IndexVlogVO> indexVlogVOPage = vlogMapperCustom.getIndexVlogList(paramMap,page);
fillRedisColumn(indexVlogVOPage.getRecords());
return indexVlogVOPage;
}
private void fillRedisColumn(List<IndexVlogVO> vlogList) {
LoginUser user = LoginHelper.getLoginUser();
vlogList.parallelStream().forEach(vlog -> {