From 50c6f60b259a9ca867f1b3cd3f4841683407c899 Mon Sep 17 00:00:00 2001 From: wangqx Date: Thu, 28 Aug 2025 17:29:26 +0800 Subject: [PATCH] =?UTF-8?q?[fix]=E4=BF=AE=E6=94=B9=E8=A7=86=E9=A2=91?= =?UTF-8?q?=E6=8E=A8=E9=80=81=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/core/constant/GlobalConstants.java | 6 + .../common/mq/domain/VlogPushEvent.java | 63 +++++++ .../mq/utils/UserRocketMQConsumerManager.java | 169 +++--------------- .../common/redis/redis/RedisCache.java | 1 + .../content/event/VlogPushEventListener.java | 20 +++ .../soopin/content/service/VlogService.java | 1 + .../service/impl/VlogPullServiceImpl.java | 22 ++- .../service/impl/VlogPushServiceImpl.java | 16 +- .../content/service/impl/VlogServiceImpl.java | 7 + 9 files changed, 141 insertions(+), 164 deletions(-) create mode 100644 ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/domain/VlogPushEvent.java create mode 100644 ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/event/VlogPushEventListener.java diff --git a/ruoyi-common/ruoyi-common-core/src/main/java/org/dromara/common/core/constant/GlobalConstants.java b/ruoyi-common/ruoyi-common-core/src/main/java/org/dromara/common/core/constant/GlobalConstants.java index d37da38e3..8692c4c55 100644 --- a/ruoyi-common/ruoyi-common-core/src/main/java/org/dromara/common/core/constant/GlobalConstants.java +++ b/ruoyi-common/ruoyi-common-core/src/main/java/org/dromara/common/core/constant/GlobalConstants.java @@ -41,4 +41,10 @@ public interface GlobalConstants { * 视频 redis key */ String VLOG_KEY = GLOBAL_REDIS_KEY + "vlog:"; + + /** + * 会员视频 redis key + */ + String GLOBAL_OFFSET = GLOBAL_REDIS_KEY + "offset:"; + } diff --git a/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/domain/VlogPushEvent.java b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/domain/VlogPushEvent.java new file mode 100644 index 000000000..592e101bb --- /dev/null +++ b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/domain/VlogPushEvent.java @@ -0,0 +1,63 @@ +package org.dromara.common.mq.domain; + +import lombok.Getter; +import org.springframework.context.ApplicationEvent; + +/** + * 消息事件 + * + * @author ruoyi + */ +@Getter +public class VlogPushEvent extends ApplicationEvent { + + private final Long userId; + private final String userIdStr; + + /** + * 使用 Long 类型用户 ID 创建消息事件 + */ + public VlogPushEvent(Object source, Long userId) { + super(source); + this.userId = userId; + this.userIdStr = userId != null ? String.valueOf(userId) : null; + } + /** + * 使用 Long 类型用户 ID 创建消息事件 + */ + public VlogPushEvent(Long userId) { + super(null); + this.userId = userId; + this.userIdStr = null; + } + + /** + * 使用 String 类型用户 ID 创建消息事件 + */ + public VlogPushEvent(Object source, String userIdStr) { + super(source); + this.userIdStr = userIdStr; + + // 尝试转换为 Long 类型 + Long parsedUserId = null; + try { + if (userIdStr != null) { + parsedUserId = Long.parseLong(userIdStr); + } + } catch (NumberFormatException e) { + // 无法解析为 Long 类型,保持 userId 为 null + } + this.userId = parsedUserId; + } + + /** + * 创建 MessageEvent 实例(支持 String 类型用户ID) + * + * @param source 事件源 + * @param userIdStr 用户ID (String类型) + * @return MessageEvent 实例 + */ + public static VlogPushEvent createWithStringUserId(Object source, String userIdStr) { + return new VlogPushEvent(source, userIdStr); + } +} diff --git a/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/UserRocketMQConsumerManager.java b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/UserRocketMQConsumerManager.java index 80f40478d..0357fae7a 100644 --- a/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/UserRocketMQConsumerManager.java +++ b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/UserRocketMQConsumerManager.java @@ -1,5 +1,6 @@ package org.dromara.common.mq.utils; +import lombok.AllArgsConstructor; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; @@ -10,6 +11,9 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import org.dromara.common.core.constant.GlobalConstants; +import org.dromara.common.redis.redis.RedisCache; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -29,9 +33,6 @@ import java.util.concurrent.TimeUnit; @Component public class UserRocketMQConsumerManager { // 全局NameServer地址 - @Value("rocketmq.name-server") - - private String namesrvAddr="192.168.1.65:9876"; // 超时时间(毫秒),默认30分钟 @@ -43,63 +44,16 @@ public class UserRocketMQConsumerManager { // 存储用户与消费者的映射(线程安全) private final Map userConsumerMap = new ConcurrentHashMap<>(); - // 存储每个用户的消费偏移量(用户ID -> 队列 -> 偏移量) - private final Map> userOffsetMap = new ConcurrentHashMap<>(); - // 定时任务执行器,用于检测超时消费者 private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final String consumerGroup="user_consumer_group"; private final String topic="MEMBER_VLOG_MSG"; - /** - * 初始化管理器 - */ - public UserRocketMQConsumerManager() { - if (namesrvAddr == null || namesrvAddr.trim().isEmpty()) { - throw new IllegalArgumentException("NameServer地址不能为空"); - } - System.setProperty("rocketmq.namesrv.addr", namesrvAddr); - // 启动定时清理任务 - startTimeoutChecker(); - } - /** - * 启动超时检查定时任务 - */ - private void startTimeoutChecker() { - scheduler.scheduleAtFixedRate(() -> { - try { - checkAndDestroyTimeoutConsumers(); - } catch (Exception e) { - System.err.println("超时检查任务执行失败: " + e.getMessage()); - } - }, checkIntervalMillis, checkIntervalMillis, TimeUnit.MILLISECONDS); - System.out.println("超时检查任务已启动,检查间隔: " + checkIntervalMillis + "ms,超时时间: " + timeoutMillis + "ms"); - } + @Autowired + private RedisCache redisCache; - /** - * 检查并销毁超时未活动的消费者 - */ - private void checkAndDestroyTimeoutConsumers() { - long currentTime = System.currentTimeMillis(); - List timeoutUserIds = new ArrayList<>(); - // 收集超时用户 - for (Map.Entry entry : userConsumerMap.entrySet()) { - String userId = entry.getKey(); - UserConsumerContext context = entry.getValue(); - - if (currentTime - context.lastActiveTime > timeoutMillis) { - timeoutUserIds.add(userId); - } - } - - // 销毁超时消费者 - for (String userId : timeoutUserIds) { - System.out.println("用户[" + userId + "]的消费者已超时未活动,自动销毁"); - destroyConsumer(userId); - } - } /** * 为指定用户创建消费者实例 @@ -118,15 +72,16 @@ public class UserRocketMQConsumerManager { System.out.println("用户[" + userId + "]的消费者已存在,更新活动时间"); return false; } - + System.setProperty("rocketmq.namesrv.addr", namesrvAddr); // 创建消费者 DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerGroup); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely()); consumer.start(); + // 初始化该用户的偏移量存储 - userOffsetMap.putIfAbsent(userId, new ConcurrentHashMap<>()); + redisCache.zSetAdd(GlobalConstants.GLOBAL_OFFSET+userId,userId,0); // 存储用户消费者上下文 UserConsumerContext context = new UserConsumerContext( @@ -158,14 +113,21 @@ public class UserRocketMQConsumerManager { Set mqs = context.consumer.fetchSubscribeMessageQueues(context.topic); List allMessages = new ArrayList<>(); + int current=0; // 拉取所有队列的消息 for (MessageQueue mq : mqs) { - PullResult result = pullSingleQueueMessage(userId, context.consumer, mq , batchSize); + PullResult result = pullSingleQueueMessage(userId, context.consumer, mq , batchSize-current); if (result.getPullStatus() == PullStatus.FOUND) { allMessages.addAll(result.getMsgFoundList()); // 更新用户的偏移量 updateUserOffset(userId, mq, result.getNextBeginOffset()); + redisCache.zSetAdd(GlobalConstants.GLOBAL_OFFSET+userId,mq.getQueueId()+"",result.getNextBeginOffset()); + current+=result.getMsgFoundList().size(); + if(current==batchSize){ + break; + } } + } return allMessages; @@ -189,41 +151,24 @@ public class UserRocketMQConsumerManager { MessageQueue mq, int batchSize) throws MQClientException, MQBrokerException, RemotingException, InterruptedException { // 获取该用户在该队列的偏移量 - long offset = getUserQueueOffset(userId, mq); - return consumer.pull(mq, userId, offset, batchSize); + Double offset = getUserQueueOffset(userId, mq); + if(offset==null){ + offset=0.0; + } + return consumer.pull(mq, userId, offset.longValue(), batchSize); } /** * 获取用户在指定队列的消费偏移量 */ - private long getUserQueueOffset(String userId, MessageQueue mq) { - Map queueOffsetMap = userOffsetMap.get(userId); - return queueOffsetMap.getOrDefault(mq, 0L); + private Double getUserQueueOffset(String userId, MessageQueue mq) { + return redisCache.zSetScore(GlobalConstants.GLOBAL_OFFSET+userId,mq.getQueueId()+"") ; } /** * 更新用户在指定队列的消费偏移量 */ private void updateUserOffset(String userId, MessageQueue mq, long offset) { - Map queueOffsetMap = userOffsetMap.get(userId); - queueOffsetMap.put(mq, offset); - } - - /** - * 手动销毁指定用户的消费者 - * @param userId 用户唯一标识 - * @return 是否销毁成功 - */ - public boolean destroyConsumer(String userId) { - validateUserId(userId); - UserConsumerContext context = userConsumerMap.remove(userId); - if (context != null) { - context.consumer.shutdown(); - userOffsetMap.remove(userId); - System.out.println("用户[" + userId + "]的消费者已销毁,消费组: " + context.consumerGroup); - return true; - } - return false; } /** @@ -242,41 +187,6 @@ public class UserRocketMQConsumerManager { } } - /** - * 设置超时时间(毫秒) - */ - public void setTimeoutMillis(long timeoutMillis) { - if (timeoutMillis <= 0) { - throw new IllegalArgumentException("超时时间必须大于0"); - } - this.timeoutMillis = timeoutMillis; - System.out.println("已更新超时时间: " + timeoutMillis + "ms"); - } - - /** - * 设置检查间隔(毫秒) - */ - public void setCheckIntervalMillis(long checkIntervalMillis) { - if (checkIntervalMillis <= 0 || checkIntervalMillis > timeoutMillis) { - throw new IllegalArgumentException("检查间隔必须大于0且小于等于超时时间"); - } - this.checkIntervalMillis = checkIntervalMillis; - // 重新启动定时任务 - scheduler.shutdownNow(); - startTimeoutChecker(); - } - - /** - * 关闭管理器,释放所有资源 - */ - public void shutdown() { - scheduler.shutdown(); - // 销毁所有消费者 - for (String userId : new ArrayList<>(userConsumerMap.keySet())) { - destroyConsumer(userId); - } - System.out.println("消费者管理器已关闭"); - } /** * 用户消费者上下文(存储消费者实例及活动时间) @@ -298,36 +208,5 @@ public class UserRocketMQConsumerManager { } } - // 测试示例 - public static void main(String[] args) throws InterruptedException { - try { - // 初始化管理器(设置超时时间为10秒,检查间隔为5秒) - UserRocketMQConsumerManager manager = new UserRocketMQConsumerManager( ); - manager.setTimeoutMillis(10 * 1000); - manager.setCheckIntervalMillis(5 * 1000); - - // 为用户1创建消费者 - manager.createConsumer("user1", "user1_group", "user_topic"); - System.out.println("当前注册用户: " + manager.getRegisteredUserIds()); - - // 用户1拉取消息(更新活动时间) - System.out.println("\n用户1首次拉取消息:"); - manager.pullUserMessages("user1", 5); - - // 等待6秒(未超时) - Thread.sleep(6000); - System.out.println("\n6秒后注册用户: " + manager.getRegisteredUserIds()); - - // 再等待5秒(累计11秒,超过10秒超时) - Thread.sleep(5000); - System.out.println("\n11秒后注册用户: " + manager.getRegisteredUserIds()); - - // 关闭管理器 - manager.shutdown(); - - } catch (Exception e) { - e.printStackTrace(); - } - } } diff --git a/ruoyi-common/ruoyi-common-redis/src/main/java/org/dromara/common/redis/redis/RedisCache.java b/ruoyi-common/ruoyi-common-redis/src/main/java/org/dromara/common/redis/redis/RedisCache.java index f7781add8..0ccabc737 100644 --- a/ruoyi-common/ruoyi-common-redis/src/main/java/org/dromara/common/redis/redis/RedisCache.java +++ b/ruoyi-common/ruoyi-common-redis/src/main/java/org/dromara/common/redis/redis/RedisCache.java @@ -315,6 +315,7 @@ public class RedisCache redisTemplate.opsForZSet().incrementScore(key, value, -delta); } + /** * 是否有这个成员 * @param key diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/event/VlogPushEventListener.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/event/VlogPushEventListener.java new file mode 100644 index 000000000..9e0effb7d --- /dev/null +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/event/VlogPushEventListener.java @@ -0,0 +1,20 @@ +package com.wzj.soopin.content.event; + +import com.wzj.soopin.content.service.IVlogPushService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.mq.domain.VlogPushEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +public class VlogPushEventListener { + private final IVlogPushService vlogPushService; + @EventListener + public void handleNeedPushVlogEvent(VlogPushEvent event) { + log.info("收到需要推送的视频事件,用户ID:{}", event.getUserId()); + vlogPushService.pushVlogToMember(100, event.getUserId()); + } +} diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/VlogService.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/VlogService.java index d12f3ebd1..bed9f7551 100644 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/VlogService.java +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/VlogService.java @@ -140,5 +140,6 @@ public interface VlogService extends IService { int readVlog(Long memberId, String vlogId); List getIndexVlogList(Map paramMap); + Page getIndexVlogList(Map paramMap,Page page); } diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogPullServiceImpl.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogPullServiceImpl.java index 3e6199d2e..e019788d3 100644 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogPullServiceImpl.java +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogPullServiceImpl.java @@ -1,6 +1,5 @@ package com.wzj.soopin.content.service.impl; -import com.google.gson.JsonObject; import com.wzj.soopin.content.service.IVlogPullService; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -10,12 +9,12 @@ import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.message.MessageExt; import org.dromara.common.core.domain.model.LoginUser; -import org.dromara.common.core.exception.ServiceException; import org.dromara.common.json.utils.JsonUtils; import org.dromara.common.mq.domain.MQMessage; +import org.dromara.common.mq.domain.VlogPushEvent; import org.dromara.common.mq.utils.UserRocketMQConsumerManager; import org.dromara.common.satoken.utils.LoginHelper; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Service; import java.util.ArrayList; @@ -31,6 +30,7 @@ public class VlogPullServiceImpl implements IVlogPullService { private final UserRocketMQConsumerManager mqConsumerManager; private final VlogService vlogService; + private final ApplicationEventPublisher eventPublisher; @Override public Page page(Page page) { @@ -45,18 +45,24 @@ public class VlogPullServiceImpl implements IVlogPullService { messageExts = mqConsumerManager.pullUserMessages(loginUser.getUserId() + "", (int) page.getSize()); } - List ids=messageExts.stream().map(messageExt -> { + List ids = messageExts.stream().map(messageExt -> { MQMessage mqMessage = JsonUtils.parseObject(messageExt.getBody(), MQMessage.class); return mqMessage.getData().toString(); }).collect(Collectors.toList()); + Map paramMap = new HashMap<>(); + if (ids.size() > 0) { - if(ids.size()>0){ - Map paramMap = new HashMap<>(); paramMap.put("ids", ids); List vlogVOList = vlogService.getIndexVlogList(paramMap); return page.setRecords(vlogVOList); - }else{ - return page; + } else { + + //发出事件 + eventPublisher.publishEvent(new VlogPushEvent(loginUser.getUserId())); + //先临时取10条数据 + Page indexVlogVOPage = vlogService.getIndexVlogList(paramMap, page); + //直接获取数据库数据,并要求push类推送数据 + return indexVlogVOPage; } } catch (Exception e) { diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogPushServiceImpl.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogPushServiceImpl.java index e852e9808..e9e94628b 100644 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogPushServiceImpl.java +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogPushServiceImpl.java @@ -4,32 +4,26 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; import com.wzj.soopin.content.domain.po.Vlog; -import com.wzj.soopin.content.domain.vo.IndexVlogVO; -import com.wzj.soopin.content.domain.vo.VlogerVO; import com.wzj.soopin.content.service.IVlogPushService; import com.wzj.soopin.content.service.VlogService; import com.wzj.soopin.member.domain.po.Member; -import com.wzj.soopin.member.domain.vo.MemberVO; import com.wzj.soopin.member.service.IMemberService; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.dromara.common.core.utils.MapstructUtils; import org.dromara.common.mq.domain.MQMessage; import org.dromara.common.mq.utils.MqUtil; import org.dromara.common.redis.redis.RedisCache; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.time.LocalDateTime; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; import java.util.stream.Collectors; -import static com.wzj.soopin.content.domain.base.BaseInfoProperties.*; +import static com.wzj.soopin.content.domain.base.BaseInfoProperties.REDIS_VLOG_BE_LIKED_COUNTS; @Service @AllArgsConstructor diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogServiceImpl.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogServiceImpl.java index 37300427e..fd43a373b 100644 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogServiceImpl.java +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogServiceImpl.java @@ -618,6 +618,13 @@ public class VlogServiceImpl extends ServiceImpl implements Vl return indexVlogVOPage; } + @Override + public Page getIndexVlogList(Map paramMap, Page page) { + Page indexVlogVOPage = vlogMapperCustom.getIndexVlogList(paramMap,page); + fillRedisColumn(indexVlogVOPage.getRecords()); + return indexVlogVOPage; + } + private void fillRedisColumn(List vlogList) { LoginUser user = LoginHelper.getLoginUser(); vlogList.parallelStream().forEach(vlog -> {