diff --git a/ruoyi-admin/src/main/java/org/dromara/app/AppCommentController.java b/ruoyi-admin/src/main/java/org/dromara/app/AppCommentController.java index d8e32d6dd..4910a4f68 100644 --- a/ruoyi-admin/src/main/java/org/dromara/app/AppCommentController.java +++ b/ruoyi-admin/src/main/java/org/dromara/app/AppCommentController.java @@ -14,6 +14,7 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import lombok.extern.slf4j.Slf4j; import org.dromara.common.core.domain.R; +import org.dromara.common.redis.redis.RedisCache; import org.dromara.common.satoken.utils.LoginHelper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @@ -32,6 +33,9 @@ public class AppCommentController { private RedisOperator redis; @Autowired private MsgService msgService; + @Autowired + private RedisCache redisCache; + @ApiOperation("查询视频评论列表") @PostMapping("/vlogComments") public R> queryVlogComments( @@ -66,7 +70,7 @@ public class AppCommentController { commentService.createComment(bo); // 2) 短视频评论总数 +1(Redis 优先) - redis.increment(BaseInfoProperties.REDIS_VLOG_COMMENT_COUNTS + ":" + bo.getVlogId(), 1); + redisCache.zSetIncrement(BaseInfoProperties.REDIS_VLOG_COMMENT_COUNTS , bo.getVlogId(), 1); // 3) 发送站内消息:根评论 -> 通知视频作者;回复评论 -> 通知被回复用户 if ("0".equals(bo.getFatherCommentId())) { diff --git a/ruoyi-admin/src/main/java/org/dromara/app/AppVlogController.java b/ruoyi-admin/src/main/java/org/dromara/app/AppVlogController.java index 716f8f66a..680cc386d 100644 --- a/ruoyi-admin/src/main/java/org/dromara/app/AppVlogController.java +++ b/ruoyi-admin/src/main/java/org/dromara/app/AppVlogController.java @@ -6,6 +6,7 @@ import com.wzj.soopin.content.domain.bo.*; import com.wzj.soopin.content.domain.po.MyLikedVlog; import com.wzj.soopin.content.domain.po.Vlog; import com.wzj.soopin.content.domain.vo.IndexVlogVO; +import com.wzj.soopin.content.service.IVlogPullService; import com.wzj.soopin.content.service.VlogService; import com.wzj.soopin.content.service.VlogUploadService; import com.wzj.soopin.content.utils.PagedGridResult; @@ -13,6 +14,7 @@ import com.wzj.soopin.content.utils.QcCloud; import com.wzj.soopin.content.utils.RedisOperator; import io.swagger.annotations.Api; import io.swagger.v3.oas.annotations.tags.Tag; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.dromara.common.core.domain.R; @@ -22,6 +24,7 @@ import org.dromara.common.mq.domain.MQMessage; import org.dromara.common.mq.enums.MQMessageType; import org.dromara.common.mq.enums.MessageActionEnum; import org.dromara.common.mq.utils.MqUtil; +import org.dromara.common.redis.redis.RedisCache; import org.dromara.common.satoken.utils.LoginHelper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @@ -38,34 +41,30 @@ import static com.wzj.soopin.content.domain.base.BaseInfoProperties.*; @Api(tags = "VlogController 短视频相关业务功能的接口") @RequestMapping("/app/vlog") @RestController +@AllArgsConstructor public class AppVlogController { - @Autowired - private VlogService vlogService; - @Autowired - private QcCloud qcCloud; - @Autowired - private VlogUploadService vlogUploadService; - @Autowired + private final VlogService vlogService; + private final QcCloud qcCloud; + private final VlogUploadService vlogUploadService; + public final RedisCache cache; + + private final IVlogPullService pullService; - public RedisOperator redis; @Tag(name = "首页视频列表") @PostMapping("/indexList") public R> indexList(@RequestBody IndexListBO bo, @RequestBody Page page) { - try{ - LoginUser loginUser = LoginHelper.getLoginUser(); - if (loginUser == null) { - throw new ServiceException("用户未登录"); - } - bo.setUserId(String.valueOf(loginUser.getUserId())); - }catch (Exception e){ - log.error("用户没登陆", e); + LoginUser loginUser = LoginHelper.getLoginUser(); + if (loginUser == null) { + throw new ServiceException("用户未登录"); } - Page pages = vlogService.getIndexVlogList(bo, page); + bo.setUserId(String.valueOf(loginUser.getUserId())); + Page pages = pullService.page(page); return R.ok(pages); } + @GetMapping("/detail/{vlogId}") - public R detail( @PathVariable String vlogId) { + public R detail(@PathVariable String vlogId) { return R.ok(vlogService.getVlogDetailById(vlogId)); } @@ -99,7 +98,7 @@ public class AppVlogController { throw new ServiceException("用户未登录"); } bo.setMyId(String.valueOf(loginUser.getUserId())); - Page pages = vlogService.getMyFollowVlogList( page); + Page pages = vlogService.getMyFollowVlogList(page); return R.ok(pages); } @@ -111,9 +110,10 @@ public class AppVlogController { throw new ServiceException("用户未登录"); } bo.setMyId(String.valueOf(loginUser.getUserId())); - Page pages = vlogService.getMyFriendVlogList( page); + Page pages = vlogService.getMyFriendVlogList(page); return R.ok(pages); } + @PostMapping("publish") public R publish(@RequestBody VlogBO vlogBO) throws Exception { @@ -126,12 +126,12 @@ public class AppVlogController { String url = vlogBO.getUrl(); - log.info("未审核视频地址:"+url); + log.info("未审核视频地址:" + url); String fileName = url.substring(url.lastIndexOf("/") + 1); - log.info("视频文件名称:"+fileName); - log.info("开始上传腾讯云点播:"+fileName); + log.info("视频文件名称:" + fileName); + log.info("开始上传腾讯云点播:" + fileName); String fileId = qcCloud.uploadViaTempFile(fileName); - log.info("视频发布ID:"+fileId); + log.info("视频发布ID:" + fileId); vlogBO.setFileId(fileId); // 删除minio文件 // MinIOUtils.removeFile(minIOConfig.getBucketName(),fileName); @@ -142,6 +142,7 @@ public class AppVlogController { return R.ok(); } + @Tag(name = "我的公开视频列表") @PostMapping("/myPublicList") public R> myPublicList(@RequestBody MyListBO bo, @RequestBody Page page) { @@ -149,155 +150,50 @@ public class AppVlogController { return R.ok(pages); } - - - private Integer nacosConuts=0; - - - @PostMapping("like") + @PostMapping("/like") public R like(@RequestBody VlogBO vlogBO) { LoginUser loginUser = LoginHelper.getLoginUser(); if (loginUser == null) { throw new ServiceException("用户未登录"); } - String userId = String.valueOf(loginUser.getUserId()); String vlogId = vlogBO.getId(); - - - //获取vlog - Vlog vlog = vlogService.getVlog(vlogId); - if(vlog==null){ - throw new ServiceException("视频不存在"); - } - // 我点赞的视频,关联关系保存到数据库 vlogService.userLikeVlog(userId, vlogId); - - // 点赞后,视频和视频发布者的获赞都会 +1 - redis.increment(REDIS_VLOGER_BE_LIKED_COUNTS + ":" + vlog.getMemberId(), 1); - redis.increment(REDIS_VLOG_BE_LIKED_COUNTS + ":" + vlogId, 1); - - // 我点赞的视频,需要在redis中保存关联关系 - redis.set(REDIS_USER_LIKE_VLOG + ":" + userId + ":" + vlogId, "1"); - - log.info("nacosConuts="+nacosConuts); - - String countsStr = redis.get(REDIS_VLOG_BE_LIKED_COUNTS + ":" + vlogId); - Integer counts=0; - if (StringUtils.isNotBlank(countsStr)){ - - counts=Integer.valueOf(countsStr); - if (counts>=nacosConuts){ - - vlogService.flushCounts(vlogId, counts); - } - } - if (userId != null && vlog.getMemberId() != null && !userId.equals(vlog.getMemberId())) { - // 新版:使用模板类型编号和参数 - Map params = new HashMap<>(); - params.put("userId", userId); - params.put("nickname", loginUser.getNickname()); - params.put("action", MessageActionEnum.INTERACTION_LIKE.name()); - params.put("toUserId",vlog.getMemberId()); - MQMessage message = MQMessage.builder() - .messageType(MQMessageType.IM.name()) - .data(params) - .source("member") - .build(); - // 关注消息 - MqUtil.sendIMMessage(message); - - } - return R.ok(); } - @PostMapping("unlike") + + @PostMapping("/unlike") public R unlike(@RequestBody Map params) { LoginUser loginUser = LoginHelper.getLoginUser(); if (loginUser == null) { throw new ServiceException("用户未登录"); } - String userId = String.valueOf(loginUser.getUserId()); String vlogId = params.get("vlogId"); - - //获取vlog - Vlog vlog = vlogService.getVlog(vlogId); - if(vlog==null){ - throw new ServiceException("视频不存在"); - } - - // 我取消点赞的视频,关联关系删除 vlogService.userUnLikeVlog(userId, vlogId); - - redis.decrement(REDIS_VLOGER_BE_LIKED_COUNTS + ":" + vlog.getMemberId(), 1); - redis.decrement(REDIS_VLOG_BE_LIKED_COUNTS + ":" + vlogId, 1); - redis.del(REDIS_USER_LIKE_VLOG + ":" + userId + ":" + vlogId); - return R.ok(); } + @GetMapping("/read/{vlogId}") + public R read(@PathVariable String vlogId) { + LoginUser loginUser = LoginHelper.getLoginUser(); + if (loginUser == null) { + throw new ServiceException("用户未登录"); + } + vlogService.readVlog(loginUser.getUserId(), vlogId); + return R.ok(); + } + @Tag(name = "手动触发缓存点赞最多视频") @PostMapping("/cacheTopLikedVlogs") public R cacheTopLikedVlogs(@RequestParam(defaultValue = "100") int limit) { try { - vlogService.cacheTopLikedVlogs(limit); +// vlogService.cacheTopLikedVlogs(limit); return R.ok(); } catch (Exception e) { log.error("手动触发缓存点赞最多视频失败", e); return R.fail("缓存失败: " + e.getMessage()); } } - - @Tag(name = "获取缓存中的点赞最多视频") - @GetMapping("/getTopLikedVlogs") - public R getTopLikedVlogs(@RequestParam(defaultValue = "") String date, - @RequestParam(defaultValue = "10") int pageSize, - @RequestParam(defaultValue = "0") int pageNum) { - try { - String redisKey; - if (StringUtils.isBlank(date)) { - // 如果没有指定日期,使用今天的日期 - redisKey = "top_liked_vlogs:" + java.time.LocalDateTime.now().format(java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd")); - } else { - redisKey = "top_liked_vlogs:" + date; - } - - String cachedData = redis.get(redisKey); - List> resultList = new ArrayList<>(); - - if (StringUtils.isNotBlank(cachedData)) { - // 解析JSON数据 - ObjectMapper objectMapper = new ObjectMapper(); - List> vlogList = objectMapper.readValue(cachedData, new com.fasterxml.jackson.core.type.TypeReference>>() {}); - - // 计算分页 - int startIndex = pageNum * pageSize; - int endIndex = Math.min(startIndex + pageSize, vlogList.size()); - - if (startIndex < vlogList.size()) { - // 从Redis缓存中获取指定页的数据 - resultList = vlogList.subList(startIndex, endIndex); - log.info("从Redis缓存中获取了{}条视频数据", resultList.size()); - } - } - - // 如果Redis中的数据不足,从数据库随机查询补充 - if (resultList.size() < pageSize) { - int needMore = pageSize - resultList.size(); - log.info("Redis缓存数据不足,需要从数据库随机查询{}条视频", needMore); - - // 从数据库随机查询视频 - List> randomVlogs = vlogService.getRandomVlogs(needMore); - resultList.addAll(randomVlogs); - log.info("从数据库随机查询了{}条视频", randomVlogs.size()); - } - - return R.ok(resultList); - } catch (Exception e) { - log.error("获取缓存中的点赞最多视频失败", e); - return R.fail("获取缓存失败: " + e.getMessage()); - } - } } diff --git a/ruoyi-common/ruoyi-common-core/src/main/java/org/dromara/common/core/constant/GlobalConstants.java b/ruoyi-common/ruoyi-common-core/src/main/java/org/dromara/common/core/constant/GlobalConstants.java index c2723c880..d37da38e3 100644 --- a/ruoyi-common/ruoyi-common-core/src/main/java/org/dromara/common/core/constant/GlobalConstants.java +++ b/ruoyi-common/ruoyi-common-core/src/main/java/org/dromara/common/core/constant/GlobalConstants.java @@ -36,4 +36,9 @@ public interface GlobalConstants { * 会员 redis key */ String MEMBER_KEY = GLOBAL_REDIS_KEY + "member:"; + + /** + * 视频 redis key + */ + String VLOG_KEY = GLOBAL_REDIS_KEY + "vlog:"; } diff --git a/ruoyi-common/ruoyi-common-mq/pom.xml b/ruoyi-common/ruoyi-common-mq/pom.xml index 97f1eb7d5..0ae0f2504 100644 --- a/ruoyi-common/ruoyi-common-mq/pom.xml +++ b/ruoyi-common/ruoyi-common-mq/pom.xml @@ -48,6 +48,10 @@ 2.15.2 - + + org.apache.rocketmq + rocketmq-client + 5.3.0 + diff --git a/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/MqUtil.java b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/MqUtil.java index 0e9c994ae..23031d00b 100644 --- a/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/MqUtil.java +++ b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/MqUtil.java @@ -1,8 +1,13 @@ package org.dromara.common.mq.utils; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.LitePullConsumer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.dromara.common.json.utils.JsonUtils; import org.dromara.common.mq.config.RocketMQConfig; @@ -13,18 +18,40 @@ import org.springframework.context.ApplicationContextAware; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + @Slf4j @Component public class MqUtil implements ApplicationContextAware { + private static final String NAME_SERVER_ADDR = "localhost:9876"; + private static final String CONSUMER_GROUP = "TEST_PULL_GROUP"; + private static final String TOPIC = "TEST_TOPIC"; private static RocketMQTemplate template; - + private static Map consumerMap = new ConcurrentHashMap<>(); @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { template = applicationContext.getBean(RocketMQTemplate.class); } + public void init() throws Exception { + // 1. 创建消费者实例,指定消费者组 + DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("YOUR_PULL_CONSUMER_GROUP"); + + consumer.setNamesrvAddr("localhost:9876"); // 替换为您的NameServer地址 + consumer.subscribe("YOUR_TOPIC", "*"); + consumer.start(); + // 3. 设置从何处开始消费(首次启动时) + //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + // 4. 订阅主题和Tag + consumer.subscribe("YOUR_TOPIC", "*"); // 使用*消费所有Tag + // 5. 启动消费者 + consumer.start(); + System.out.println("LitePullConsumer 启动成功"); + } + // - public static void sendMessage(String topic, MQMessage message) { + public static void sendMessage(String destination, MQMessage message) { if (template == null) { log.error("RocketMQTemplate未初始化,无法发送消息"); return; @@ -32,10 +59,10 @@ public class MqUtil implements ApplicationContextAware { try { String jsonMessage = JsonUtils.toJsonString(message); - template.convertAndSend(topic, jsonMessage); - log.info("发送消息到RocketMQ成功,topic: {}, message: {}", topic, jsonMessage); + template.convertAndSend(destination, jsonMessage); + log.info("发送消息到RocketMQ成功,topic: {}, message: {}", destination, jsonMessage); } catch (Exception e) { - log.error("发送消息到RocketMQ失败,topic: {}, message: {}", topic, message, e); + log.error("发送消息到RocketMQ失败,topic: {}, message: {}", destination, message, e); } } public static void sendIMMessage( MQMessage message) { @@ -97,5 +124,4 @@ public class MqUtil implements ApplicationContextAware { } } - } diff --git a/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/UserRocketMQConsumerManager.java b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/UserRocketMQConsumerManager.java new file mode 100644 index 000000000..80f40478d --- /dev/null +++ b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/utils/UserRocketMQConsumerManager.java @@ -0,0 +1,333 @@ +package org.dromara.common.mq.utils; + +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * 带超时自动销毁功能的用户级RocketMQ消费者管理器 + * 支持按用户隔离消费者,并自动清理超时未活动的实例 + */ +@Component +public class UserRocketMQConsumerManager { + // 全局NameServer地址 + @Value("rocketmq.name-server") + + + private String namesrvAddr="192.168.1.65:9876"; + + // 超时时间(毫秒),默认30分钟 + private long timeoutMillis = 30 * 60 * 1000; + + // 定时检查间隔(毫秒),默认5分钟 + private long checkIntervalMillis = 5 * 60 * 1000; + + // 存储用户与消费者的映射(线程安全) + private final Map userConsumerMap = new ConcurrentHashMap<>(); + + // 存储每个用户的消费偏移量(用户ID -> 队列 -> 偏移量) + private final Map> userOffsetMap = new ConcurrentHashMap<>(); + + // 定时任务执行器,用于检测超时消费者 + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + private final String consumerGroup="user_consumer_group"; + private final String topic="MEMBER_VLOG_MSG"; + /** + * 初始化管理器 + */ + public UserRocketMQConsumerManager() { + if (namesrvAddr == null || namesrvAddr.trim().isEmpty()) { + throw new IllegalArgumentException("NameServer地址不能为空"); + } + System.setProperty("rocketmq.namesrv.addr", namesrvAddr); + // 启动定时清理任务 + startTimeoutChecker(); + } + + /** + * 启动超时检查定时任务 + */ + private void startTimeoutChecker() { + scheduler.scheduleAtFixedRate(() -> { + try { + checkAndDestroyTimeoutConsumers(); + } catch (Exception e) { + System.err.println("超时检查任务执行失败: " + e.getMessage()); + } + }, checkIntervalMillis, checkIntervalMillis, TimeUnit.MILLISECONDS); + System.out.println("超时检查任务已启动,检查间隔: " + checkIntervalMillis + "ms,超时时间: " + timeoutMillis + "ms"); + } + + /** + * 检查并销毁超时未活动的消费者 + */ + private void checkAndDestroyTimeoutConsumers() { + long currentTime = System.currentTimeMillis(); + List timeoutUserIds = new ArrayList<>(); + + // 收集超时用户 + for (Map.Entry entry : userConsumerMap.entrySet()) { + String userId = entry.getKey(); + UserConsumerContext context = entry.getValue(); + + if (currentTime - context.lastActiveTime > timeoutMillis) { + timeoutUserIds.add(userId); + } + } + + // 销毁超时消费者 + for (String userId : timeoutUserIds) { + System.out.println("用户[" + userId + "]的消费者已超时未活动,自动销毁"); + destroyConsumer(userId); + } + } + + /** + * 为指定用户创建消费者实例 + * @param userId 用户唯一标识 + * @param consumerGroup 该用户的消费组 + * @param topic 消费的主题 + * @return 是否创建成功 + */ + public boolean createConsumer(String userId, String consumerGroup, String topic) throws MQClientException { + validateUserId(userId); + + // 若用户已存在消费者,更新活动时间并返回 + if (userConsumerMap.containsKey(userId)) { + UserConsumerContext context = userConsumerMap.get(userId); + context.lastActiveTime = System.currentTimeMillis(); + System.out.println("用户[" + userId + "]的消费者已存在,更新活动时间"); + return false; + } + + // 创建消费者 + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerGroup); + consumer.setMessageModel(MessageModel.CLUSTERING); + consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely()); + consumer.start(); + + // 初始化该用户的偏移量存储 + userOffsetMap.putIfAbsent(userId, new ConcurrentHashMap<>()); + + // 存储用户消费者上下文 + UserConsumerContext context = new UserConsumerContext( + consumer, + consumerGroup, + topic, + System.currentTimeMillis() + ); + userConsumerMap.put(userId, context); + System.out.println("用户[" + userId + "]的消费者创建成功,消费组: " + consumerGroup); + return true; + } + + /** + * 为指定用户拉取消息(会更新活动时间) + * @param userId 用户唯一标识 + * @param batchSize 每次拉取数量 + * @return 拉取到的消息列表 + */ + public List pullUserMessages(String userId , int batchSize) + throws MQClientException, InterruptedException, MQBrokerException, RemotingException { + validateUserId(userId); + UserConsumerContext context = getUserConsumerContext(userId); + + // 更新最后活动时间 + context.lastActiveTime = System.currentTimeMillis(); + + // 获取用户的消息队列 + Set mqs = context.consumer.fetchSubscribeMessageQueues(context.topic); + List allMessages = new ArrayList<>(); + + // 拉取所有队列的消息 + for (MessageQueue mq : mqs) { + PullResult result = pullSingleQueueMessage(userId, context.consumer, mq , batchSize); + if (result.getPullStatus() == PullStatus.FOUND) { + allMessages.addAll(result.getMsgFoundList()); + // 更新用户的偏移量 + updateUserOffset(userId, mq, result.getNextBeginOffset()); + } + } + + return allMessages; + } + + /** + * 获取指定用户的消费者上下文 + */ + private UserConsumerContext getUserConsumerContext(String userId) throws MQClientException { + UserConsumerContext context = userConsumerMap.get(userId); + if (context == null) { + createConsumer(userId, consumerGroup,topic); + } + return userConsumerMap.get(userId); + } + + /** + * 拉取单个队列的消息 + */ + private PullResult pullSingleQueueMessage(String userId, DefaultMQPullConsumer consumer, + MessageQueue mq, int batchSize) + throws MQClientException, MQBrokerException, RemotingException, InterruptedException { + // 获取该用户在该队列的偏移量 + long offset = getUserQueueOffset(userId, mq); + return consumer.pull(mq, userId, offset, batchSize); + } + + /** + * 获取用户在指定队列的消费偏移量 + */ + private long getUserQueueOffset(String userId, MessageQueue mq) { + Map queueOffsetMap = userOffsetMap.get(userId); + return queueOffsetMap.getOrDefault(mq, 0L); + } + + /** + * 更新用户在指定队列的消费偏移量 + */ + private void updateUserOffset(String userId, MessageQueue mq, long offset) { + Map queueOffsetMap = userOffsetMap.get(userId); + queueOffsetMap.put(mq, offset); + } + + /** + * 手动销毁指定用户的消费者 + * @param userId 用户唯一标识 + * @return 是否销毁成功 + */ + public boolean destroyConsumer(String userId) { + validateUserId(userId); + UserConsumerContext context = userConsumerMap.remove(userId); + if (context != null) { + context.consumer.shutdown(); + userOffsetMap.remove(userId); + System.out.println("用户[" + userId + "]的消费者已销毁,消费组: " + context.consumerGroup); + return true; + } + return false; + } + + /** + * 获取当前所有用户的消费者列表 + */ + public Set getRegisteredUserIds() { + return userConsumerMap.keySet(); + } + + /** + * 校验用户ID + */ + private void validateUserId(String userId) { + if (userId == null || userId.trim().isEmpty()) { + throw new IllegalArgumentException("用户ID不能为空"); + } + } + + /** + * 设置超时时间(毫秒) + */ + public void setTimeoutMillis(long timeoutMillis) { + if (timeoutMillis <= 0) { + throw new IllegalArgumentException("超时时间必须大于0"); + } + this.timeoutMillis = timeoutMillis; + System.out.println("已更新超时时间: " + timeoutMillis + "ms"); + } + + /** + * 设置检查间隔(毫秒) + */ + public void setCheckIntervalMillis(long checkIntervalMillis) { + if (checkIntervalMillis <= 0 || checkIntervalMillis > timeoutMillis) { + throw new IllegalArgumentException("检查间隔必须大于0且小于等于超时时间"); + } + this.checkIntervalMillis = checkIntervalMillis; + // 重新启动定时任务 + scheduler.shutdownNow(); + startTimeoutChecker(); + } + + /** + * 关闭管理器,释放所有资源 + */ + public void shutdown() { + scheduler.shutdown(); + // 销毁所有消费者 + for (String userId : new ArrayList<>(userConsumerMap.keySet())) { + destroyConsumer(userId); + } + System.out.println("消费者管理器已关闭"); + } + + /** + * 用户消费者上下文(存储消费者实例及活动时间) + */ + private static class UserConsumerContext { + DefaultMQPullConsumer consumer; + String consumerGroup; + String topic; + long createTime; // 创建时间 + long lastActiveTime; // 最后活动时间(拉取消息时间) + + public UserConsumerContext(DefaultMQPullConsumer consumer, String consumerGroup, + String topic, long createTime) { + this.consumer = consumer; + this.consumerGroup = consumerGroup; + this.topic = topic; + this.createTime = createTime; + this.lastActiveTime = createTime; // 初始活动时间=创建时间 + } + } + + // 测试示例 + public static void main(String[] args) throws InterruptedException { + try { + // 初始化管理器(设置超时时间为10秒,检查间隔为5秒) + UserRocketMQConsumerManager manager = new UserRocketMQConsumerManager( ); + manager.setTimeoutMillis(10 * 1000); + manager.setCheckIntervalMillis(5 * 1000); + + // 为用户1创建消费者 + manager.createConsumer("user1", "user1_group", "user_topic"); + System.out.println("当前注册用户: " + manager.getRegisteredUserIds()); + + // 用户1拉取消息(更新活动时间) + System.out.println("\n用户1首次拉取消息:"); + manager.pullUserMessages("user1", 5); + + // 等待6秒(未超时) + Thread.sleep(6000); + System.out.println("\n6秒后注册用户: " + manager.getRegisteredUserIds()); + + // 再等待5秒(累计11秒,超过10秒超时) + Thread.sleep(5000); + System.out.println("\n11秒后注册用户: " + manager.getRegisteredUserIds()); + + // 关闭管理器 + manager.shutdown(); + + } catch (Exception e) { + e.printStackTrace(); + } + } +} + diff --git a/ruoyi-common/ruoyi-common-redis/src/main/java/org/dromara/common/redis/redis/RedisCache.java b/ruoyi-common/ruoyi-common-redis/src/main/java/org/dromara/common/redis/redis/RedisCache.java index 165171a53..f7781add8 100644 --- a/ruoyi-common/ruoyi-common-redis/src/main/java/org/dromara/common/redis/redis/RedisCache.java +++ b/ruoyi-common/ruoyi-common-redis/src/main/java/org/dromara/common/redis/redis/RedisCache.java @@ -7,6 +7,7 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ValueOperations; import org.springframework.stereotype.Component; +import java.sql.ShardingKey; import java.util.*; import java.util.concurrent.TimeUnit; @@ -121,7 +122,7 @@ public class RedisCache * 获得缓存的list对象 * * @param key 缓存的键值 - * @return 缓存键值对应的数据 + * * @return 缓存键值对应的数据 */ public List getCacheList(final String key) { @@ -230,4 +231,97 @@ public class RedisCache } + //zeset类型的增删改查 + /** + * 新增zset + * @param key + * @param value + * @param score + */ + public void zSetAdd(String key, String value, double score) { + redisTemplate.opsForZSet().add(key, value, score); + } + + /** + * 查询zset + * @param key + * @param start + * @param end + * @return + */ + public Set zSetRange(String key, long start, long end) { + return redisTemplate.opsForZSet().range(key, start, end); + } + /** + * 删除zset + * @param key + * @param value + */ + public void zSetRemove(String key, String value) { + redisTemplate.opsForZSet().remove(key, value); + } + /** + * 查询zset分数 + * @param key + * @param value + * @return + */ + public Double zSetScore(String key, String value) { + return redisTemplate.opsForZSet().score(key, value); + } + /** + * 查询zset长度 + * @param key + * @return + */ + public Long zSetSize(String key) { + return redisTemplate.opsForZSet().size(key); + } + /** + * 查询zset排名 + * @param key + * @param value + * @return + */ + public Long zSetRank(String key, String value) { + return redisTemplate.opsForZSet().rank(key, value); + } + /** + * 排行榜 + * @param key + * @param start + * @param end + * @return + */ + public Set zSetReverseRange(String key, long start, long end) { + return redisTemplate.opsForZSet().reverseRange(key, start, end); + } + /** + * 增加增量 + * @param key + * @param value + * @param delta + */ + public void zSetIncrement(String key, String value, double delta) { + redisTemplate.opsForZSet().incrementScore(key, value, delta); + } + /** + * 减少数量 + * @param key + * @param value + * @param delta + */ + public void zSetDecrement(String key, String value, double delta) { + redisTemplate.opsForZSet().incrementScore(key, value, -delta); + } + + /** + * 是否有这个成员 + * @param key + * @param value + * @return + */ + public boolean zSetHasMember(String key, String value) { + return redisTemplate.opsForZSet().rank(key, value) != null; + } } diff --git a/ruoyi-common/ruoyi-common-redis/src/main/java/org/dromara/common/redis/utils/RedisUtils.java b/ruoyi-common/ruoyi-common-redis/src/main/java/org/dromara/common/redis/utils/RedisUtils.java index 355cd2931..228502cd0 100644 --- a/ruoyi-common/ruoyi-common-redis/src/main/java/org/dromara/common/redis/utils/RedisUtils.java +++ b/ruoyi-common/ruoyi-common-redis/src/main/java/org/dromara/common/redis/utils/RedisUtils.java @@ -7,10 +7,7 @@ import org.redisson.api.*; import org.redisson.api.options.KeysScanOptions; import java.time.Duration; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -581,4 +578,6 @@ public class RedisUtils { RKeys rKeys = CLIENT.getKeys(); return rKeys.countExists(key) > 0; } + + } diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/ArticleCategoryController.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/ArticleCategoryController.java index 9d707a5a5..be6a47c41 100644 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/ArticleCategoryController.java +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/ArticleCategoryController.java @@ -27,7 +27,7 @@ import java.util.List; */ @Tag(name = "文章分类管理") @RestController -@RequestMapping("/content/article/category") +@RequestMapping("/cms/article/category") @RequiredArgsConstructor public class ArticleCategoryController { diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/ArticleController.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/ArticleController.java index 253d9d3b6..f3826d7c3 100644 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/ArticleController.java +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/ArticleController.java @@ -27,7 +27,7 @@ import java.util.List; */ @Tag(name = "内容管理") @RestController -@RequestMapping("/content/article") +@RequestMapping("/cms/article") @RequiredArgsConstructor public class ArticleController { diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/VlogController.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/VlogController.java index aba847e2c..e5963d13b 100644 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/VlogController.java +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/VlogController.java @@ -5,6 +5,7 @@ package com.wzj.soopin.content.controller; import com.wzj.soopin.content.domain.base.BaseInfoProperties; import com.wzj.soopin.content.domain.bo.VlogBO; import com.wzj.soopin.content.enums.YesOrNo; +import com.wzj.soopin.content.service.IVlogPushService; import org.dromara.common.core.domain.R; import com.wzj.soopin.content.service.VlogService; import com.wzj.soopin.content.service.VlogUploadService; @@ -33,7 +34,7 @@ import java.util.ArrayList; @Slf4j @Api(tags = "VlogController 短视频相关业务功能的接口") -@RequestMapping("/vlog") +@RequestMapping("/cms/vlog") @RestController public class VlogController extends BaseInfoProperties { @@ -45,6 +46,8 @@ public class VlogController extends BaseInfoProperties { private VlogUploadService vlogUploadService; @Autowired public RedisOperator redis; + @Autowired + private IVlogPushService vlogPushService; @PostMapping("vodCallBack") public R vodCallBack(@RequestBody Map callbackData) { @@ -194,7 +197,13 @@ public class VlogController extends BaseInfoProperties { public R totalLikedCounts(@RequestParam String vlogId) { return R.ok(vlogService.getVlogBeLikedCounts(vlogId)); } - - + @GetMapping("/push/hot") + public R pushHot(@RequestParam int count) { + return R.ok(vlogPushService.cacheTopLikedVlogs(count)); + } + @GetMapping("/push/member") + public R pushMember(@RequestParam int count,@RequestParam Long memberId) { + return R.ok(vlogPushService.pushVlogToMember(count,memberId)); + } } diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/admin/CommentController.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/admin/CommentController.java index d21dd8fbe..b4ac60e4c 100644 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/admin/CommentController.java +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/admin/CommentController.java @@ -33,7 +33,7 @@ import com.wzj.soopin.content.enums.MessageEnum; @Slf4j @Api(tags = "管理端-评论管理接口") -@RequestMapping("/comment") +@RequestMapping("/cms/comment") @RestController public class CommentController extends BaseController { diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/admin/VlogUploadController.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/admin/VlogUploadController.java index 62f264770..843781d7c 100644 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/admin/VlogUploadController.java +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/controller/admin/VlogUploadController.java @@ -24,7 +24,7 @@ import java.util.*; @Slf4j @Api(tags = "管理端-视频") -@RequestMapping("/video") +@RequestMapping("/cms/video") @RestController public class VlogUploadController extends BaseInfoProperties { diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/mapper/VlogMapper.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/mapper/VlogMapper.java index a7c305563..d77e5fc9a 100644 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/mapper/VlogMapper.java +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/mapper/VlogMapper.java @@ -168,4 +168,8 @@ public interface VlogMapper extends BaseMapper { "ORDER BY RAND() " + "LIMIT #{limit}") List> selectRandomVlogs(@Param("limit") int limit); + + + IPage getVlogForUser(Page page, @Param("memberId") Long memberId); + int readVlog(@Param("memberId") Long memberId, @Param("vlogId") String vlogId); } diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/IVlogPullService.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/IVlogPullService.java new file mode 100644 index 000000000..66f7ed959 --- /dev/null +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/IVlogPullService.java @@ -0,0 +1,12 @@ +package com.wzj.soopin.content.service; + +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.wzj.soopin.content.domain.vo.IndexVlogVO; + +public interface IVlogPullService { + + /** + * 从视频库中拉取视频 + */ + Page page(Page page); +} diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/IVlogPushService.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/IVlogPushService.java new file mode 100644 index 000000000..bff3ad1c2 --- /dev/null +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/IVlogPushService.java @@ -0,0 +1,12 @@ +package com.wzj.soopin.content.service; + +/** + * 视频推送服务 + * @author wqx + * @date 2023-12-20 + */ +public interface IVlogPushService { + boolean cacheTopLikedVlogs(int limit) ; + + boolean pushVlogToMember(int count,Long memberId); +} diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/VlogService.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/VlogService.java index 4aa521de5..d12f3ebd1 100644 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/VlogService.java +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/VlogService.java @@ -2,6 +2,7 @@ package com.wzj.soopin.content.service; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.baomidou.mybatisplus.extension.service.IService; import com.wzj.soopin.content.domain.bo.*; import com.wzj.soopin.content.domain.po.Vlog; import com.wzj.soopin.content.domain.vo.IndexVlogVO; @@ -10,7 +11,7 @@ import com.wzj.soopin.content.utils.PagedGridResult; import java.util.List; import java.util.Map; -public interface VlogService { +public interface VlogService extends IService { /** * 修改视频首帧图 */ @@ -25,10 +26,6 @@ public interface VlogService { */ public void createVlog(VlogBO vlogBO); - /** - * 查询首页/搜索的vlog列表 - */ - public Page getIndexVlogList(IndexListBO bo, Page page); /** * 根据视频主键查询vlog @@ -115,10 +112,6 @@ public interface VlogService { */ List> getLikedUsers(String vlogId); - /** - * 获取视频点赞数 - */ - int getLikeCounts(String vlogId); /** * 获取视频上传者信息 @@ -135,11 +128,6 @@ public interface VlogService { */ IPage> getVlogListByMobile(Page> page, VlogBO vlogBO); - /** - * 查询点赞最多的视频列表并存储到Redis - * @param limit 查询数量限制 - */ - void cacheTopLikedVlogs(int limit); /** * 随机查询视频列表 @@ -147,4 +135,10 @@ public interface VlogService { * @return 随机视频列表 */ List> getRandomVlogs(int limit); + + IPage getVlogForUser(Page page, Long memberId); + int readVlog(Long memberId, String vlogId); + + List getIndexVlogList(Map paramMap); + } diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/MsgServiceImpl.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/MsgServiceImpl.java index dbac54880..7713e20b2 100644 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/MsgServiceImpl.java +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/MsgServiceImpl.java @@ -80,7 +80,7 @@ public class MsgServiceImpl extends BaseInfoProperties implements MsgService { map = new HashMap<>(); } - String relationship = redis.get(REDIS_FANS_AND_VLOGGER_RELATIONSHIP + ":" + msg.getToUserId() + ":" + msg.getFromUserId()); + String relationship = redis.getCacheObject(REDIS_FANS_AND_VLOGGER_RELATIONSHIP + ":" + msg.getToUserId() + ":" + msg.getFromUserId()); if (StringUtils.isNotBlank(relationship) && relationship.equalsIgnoreCase("1")) { map.put("isFriend", true); } else { diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogPullServiceImpl.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogPullServiceImpl.java new file mode 100644 index 000000000..3e6199d2e --- /dev/null +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogPullServiceImpl.java @@ -0,0 +1,67 @@ +package com.wzj.soopin.content.service.impl; + +import com.google.gson.JsonObject; +import com.wzj.soopin.content.service.IVlogPullService; + +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.wzj.soopin.content.domain.vo.IndexVlogVO; +import com.wzj.soopin.content.service.VlogService; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.common.message.MessageExt; +import org.dromara.common.core.domain.model.LoginUser; +import org.dromara.common.core.exception.ServiceException; +import org.dromara.common.json.utils.JsonUtils; +import org.dromara.common.mq.domain.MQMessage; +import org.dromara.common.mq.utils.UserRocketMQConsumerManager; +import org.dromara.common.satoken.utils.LoginHelper; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@Service +@AllArgsConstructor +@Slf4j +public class VlogPullServiceImpl implements IVlogPullService { + + private final UserRocketMQConsumerManager mqConsumerManager; + private final VlogService vlogService; + + @Override + public Page page(Page page) { + + List messageExts = new ArrayList<>(); + + try { + LoginUser loginUser = LoginHelper.getLoginUser(); + if (loginUser == null) { + messageExts = mqConsumerManager.pullUserMessages("hot", (int) page.getSize()); + } else { + messageExts = mqConsumerManager.pullUserMessages(loginUser.getUserId() + "", (int) page.getSize()); + + } + List ids=messageExts.stream().map(messageExt -> { + MQMessage mqMessage = JsonUtils.parseObject(messageExt.getBody(), MQMessage.class); + return mqMessage.getData().toString(); + }).collect(Collectors.toList()); + + if(ids.size()>0){ + Map paramMap = new HashMap<>(); + paramMap.put("ids", ids); + List vlogVOList = vlogService.getIndexVlogList(paramMap); + return page.setRecords(vlogVOList); + }else{ + return page; + } + + } catch (Exception e) { + log.error("拉取视频失败", e); + } + return page; + } +} diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogPushServiceImpl.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogPushServiceImpl.java new file mode 100644 index 000000000..e852e9808 --- /dev/null +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogPushServiceImpl.java @@ -0,0 +1,112 @@ +package com.wzj.soopin.content.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.wzj.soopin.content.domain.po.Vlog; +import com.wzj.soopin.content.domain.vo.IndexVlogVO; +import com.wzj.soopin.content.domain.vo.VlogerVO; +import com.wzj.soopin.content.service.IVlogPushService; +import com.wzj.soopin.content.service.VlogService; +import com.wzj.soopin.member.domain.po.Member; +import com.wzj.soopin.member.domain.vo.MemberVO; +import com.wzj.soopin.member.service.IMemberService; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.dromara.common.core.utils.MapstructUtils; +import org.dromara.common.mq.domain.MQMessage; +import org.dromara.common.mq.utils.MqUtil; +import org.dromara.common.redis.redis.RedisCache; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.time.LocalDateTime; +import java.util.*; +import java.util.stream.Collectors; + +import static com.wzj.soopin.content.domain.base.BaseInfoProperties.*; + +@Service +@AllArgsConstructor +@Slf4j +public class VlogPushServiceImpl implements IVlogPushService { + + private final RedisCache redisCache; + private final VlogService vlogService; + + private final IMemberService memberService; + + @Override + public boolean cacheTopLikedVlogs(int limit ) { + try { + // 1. 获取 ZSet 中所有成员(按分数升序排列) + // 这里我们只关心成员(member),不关心分数(score),所以使用 range(key, start, end) + log.info("开始查询点赞最多的{}条视频", limit); + Set rankSet = redisCache.zSetRange(REDIS_VLOG_BE_LIKED_COUNTS, 0, limit); + if (rankSet == null ||rankSet.isEmpty()) { + return false; + } + if(rankSet!=null&&rankSet.size() vlogPage = new Page<>(1, limit-rankSet.size()); + vlogPage = vlogService.page(vlogPage, new LambdaQueryWrapper() + .orderByDesc(Vlog::getLikeCounts)); + Set vlogIds = vlogPage.getRecords().stream().map(vlog -> vlog.getId()).collect(Collectors.toSet()); + rankSet.addAll(vlogIds); + } + // 3. 将 Set 转换为 List 以便打乱顺序 + List vlogList = new ArrayList<>(rankSet); + + // 4. 使用 Collections.shuffle 打乱 List 的顺序 + Collections.shuffle(vlogList); + vlogList.stream().forEach(vlogId -> { + + //将数据发送的mq的热点数据队列 + MQMessage message = MQMessage.builder() + .topic("MEMBER_VLOG_MSG") + .tag("hot") + .messageType("json") + .data(vlogId) + .source("vlog_service") + .sendTime(LocalDateTime.now()) + .build(); + MqUtil.sendMessage("MEMBER_VLOG_MSG", message); + }); + + } catch (Exception e) { + log.error("缓存点赞最多视频到Redis失败", e); + } + return true; + } + + @Override + public boolean pushVlogToMember(int count,Long memberId ) { + + Member member = memberService.getById(memberId); + String cityCode = member.getCity(); + IPage vlogPage=vlogService.getVlogForUser(new Page<>(1, 100), memberId); + if(CollectionUtils.isEmpty(vlogPage.getRecords())){ + return false; + } + vlogPage.getRecords().stream().forEach(vlogId -> { + //将数据发送的mq的热点数据队列 + MQMessage message = MQMessage.builder() + .topic("MEMBER_VLOG_MSG") + .tag(memberId+"") + .messageType("json") + .data(vlogId) + .source("vlog_service") + .sendTime(LocalDateTime.now()) + .build(); + MqUtil.sendMessage("MEMBER_VLOG_MSG"+":"+memberId, message); + }); + + return true; + } + +} diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogUploadServiceImpl.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogUploadServiceImpl.java index c7fecf6e9..9bec923d7 100644 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogUploadServiceImpl.java +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogUploadServiceImpl.java @@ -14,6 +14,7 @@ import com.wzj.soopin.content.utils.TencentCloudUtil; import lombok.extern.slf4j.Slf4j; import org.dromara.common.mybatis.core.page.PageQuery; import org.dromara.common.mybatis.core.page.TableDataInfo; +import org.dromara.common.redis.redis.RedisCache; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -36,7 +37,7 @@ public class VlogUploadServiceImpl implements VlogUploadService { @Autowired private com.wzj.soopin.content.service.CommentService commentService; @Autowired - private com.wzj.soopin.content.utils.RedisOperator redis; + private RedisCache redisCache; @Autowired private com.wzj.soopin.content.mapper.VlogMapperCustom vlogMapperCustom; @@ -301,26 +302,11 @@ public class VlogUploadServiceImpl implements VlogUploadService { } // 2. 点赞数量:优先 Redis,无则 MySQL - String likeCountsStr = redis.get(com.wzj.soopin.content.domain.base.BaseInfoProperties.REDIS_VLOG_BE_LIKED_COUNTS + ":" + vlogId); - int likeCounts; - if (likeCountsStr != null) { - likeCounts = Integer.parseInt(likeCountsStr); - } else if (vlog != null && vlog.getLikeCounts() != null) { - likeCounts = vlog.getLikeCounts(); - } else { - likeCounts = 0; - } + int likeCounts=redisCache.zSetScore(com.wzj.soopin.content.domain.base.BaseInfoProperties.REDIS_VLOG_BE_LIKED_COUNTS , vlogId).intValue(); // 3. 评论数量:优先 Redis,无则 MySQL - String commentCountsStr = redis.get(com.wzj.soopin.content.domain.base.BaseInfoProperties.REDIS_VLOG_COMMENT_COUNTS + ":" + vlogId); - int commentCounts; - if (commentCountsStr != null) { - commentCounts = Integer.parseInt(commentCountsStr); - } else if (vlog != null && vlog.getCommentsCounts() != null) { - commentCounts = vlog.getCommentsCounts(); - } else { - commentCounts = 0; - } + int commentCounts=redisCache.zSetScore(com.wzj.soopin.content.domain.base.BaseInfoProperties.REDIS_VLOG_COMMENT_COUNTS , vlogId).intValue(); + // 4. 评论内容:只查 MySQL List commentList = new ArrayList<>(); diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/task/VlogScheduledTask.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/task/VlogScheduledTask.java index e472764bb..fca28ed6e 100644 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/task/VlogScheduledTask.java +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/task/VlogScheduledTask.java @@ -27,7 +27,7 @@ public class VlogScheduledTask { public void jobExecute() { log.info("开始执行定时任务:查询点赞最多的100条视频并存储到Redis"); try { - vlogService.cacheTopLikedVlogs(100); +// vlogService.cacheTopLikedVlogs(100); log.info("定时任务执行完成:成功缓存点赞最多的100条视频到Redis"); } catch (Exception e) { log.error("定时任务执行失败:缓存点赞最多视频到Redis时发生异常", e); diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/utils/RedisOperator.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/utils/RedisOperator.java index 20ed89ff0..8756f9d7f 100644 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/utils/RedisOperator.java +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/utils/RedisOperator.java @@ -24,286 +24,286 @@ public class RedisOperator { private StringRedisTemplate redisTemplate; // Key(键),简单的key-value操作 - - /** - * 判断key是否存在 - * @param key - * @return - */ - public boolean keyIsExist(String key) { - return redisTemplate.hasKey(key); - } - - /** - * 实现命令:TTL key,以秒为单位,返回给定 key的剩余生存时间(TTL, time to live)。 - * - * @param key - * @return - */ - public long ttl(String key) { - return redisTemplate.getExpire(key); - } - - /** - * 实现命令:expire 设置过期时间,单位秒 - * - * @param key - * @return - */ - public void expire(String key, long timeout) { - redisTemplate.expire(key, timeout, TimeUnit.SECONDS); - } - - /** - * 实现命令:increment key,增加key一次 - * - * @param key - * @return - */ - public long increment(String key, long delta) { - return redisTemplate.opsForValue().increment(key, delta); - } - - /** - * 累加,使用hash - */ - public long incrementHash(String name, String key, long delta) { - return redisTemplate.opsForHash().increment(name, key, delta); - } - - /** - * 累减,使用hash - */ - public long decrementHash(String name, String key, long delta) { - delta = delta * (-1); - return redisTemplate.opsForHash().increment(name, key, delta); - } - - /** - * hash 设置value - */ - public void setHashValue(String name, String key, String value) { - redisTemplate.opsForHash().put(name, key, value); - } - - /** - * hash 获得value - */ - public String getHashValue(String name, String key) { - return (String)redisTemplate.opsForHash().get(name, key); - } - - /** - * 实现命令:decrement key,减少key一次 - * - * @param key - * @return - */ - public long decrement(String key, long delta) { - return redisTemplate.opsForValue().decrement(key, delta); - } - - /** - * 实现命令:KEYS pattern,查找所有符合给定模式 pattern的 key - */ - public Set keys(String pattern) { - return redisTemplate.keys(pattern); - } - - /** - * 实现命令:DEL key,删除一个key - * - * @param key - */ - public void del(String key) { - redisTemplate.delete(key); - } - - // String(字符串) - - /** - * 实现命令:SET key value,设置一个key-value(将字符串值 value关联到 key) - * - * @param key - * @param value - */ - public void set(String key, String value) { - redisTemplate.opsForValue().set(key, value); - } - - /** - * 实现命令:SET key value EX seconds,设置key-value和超时时间(秒) - * - * @param key - * @param value - * @param timeout - * (以秒为单位) - */ - public void set(String key, String value, long timeout) { - redisTemplate.opsForValue().set(key, value, timeout, TimeUnit.SECONDS); - } - - /** - * 如果key不存在,则设置,如果存在,则报错 - * @param key - * @param value - */ - public void setnx60s(String key, String value) { - redisTemplate.opsForValue().setIfAbsent(key, value, 60, TimeUnit.SECONDS); - } - - /** - * 如果key不存在,则设置,如果存在,则报错 - * @param key - * @param value - */ - public void setnx(String key, String value) { - redisTemplate.opsForValue().setIfAbsent(key, value); - } - - /** - * 实现命令:GET key,返回 key所关联的字符串值。 - * - * @param key - * @return value - */ - public String get(String key) { - return (String)redisTemplate.opsForValue().get(key); - } - - /** - * 批量查询,对应mget - * @param keys - * @return - */ - public List mget(List keys) { - return redisTemplate.opsForValue().multiGet(keys); - } - - /** - * 批量查询,管道pipeline - * @param keys - * @return - */ - public List batchGet(List keys) { - -// nginx -> keepalive -// redis -> pipeline - - List result = redisTemplate.executePipelined(new RedisCallback() { - @Override - public String doInRedis(RedisConnection connection) throws DataAccessException { - StringRedisConnection src = (StringRedisConnection)connection; - - for (String k : keys) { - src.get(k); - } - return null; - } - }); - - return result; - } - - - // Hash(哈希表) - - /** - * 实现命令:HSET key field value,将哈希表 key中的域 field的值设为 value - * - * @param key - * @param field - * @param value - */ - public void hset(String key, String field, Object value) { - redisTemplate.opsForHash().put(key, field, value); - } - - /** - * 实现命令:HGET key field,返回哈希表 key中给定域 field的值 - * - * @param key - * @param field - * @return - */ - public String hget(String key, String field) { - return (String) redisTemplate.opsForHash().get(key, field); - } - - /** - * 实现命令:HDEL key field [field ...],删除哈希表 key 中的一个或多个指定域,不存在的域将被忽略。 - * - * @param key - * @param fields - */ - public void hdel(String key, Object... fields) { - redisTemplate.opsForHash().delete(key, fields); - } - - /** - * 实现命令:HGETALL key,返回哈希表 key中,所有的域和值。 - * - * @param key - * @return - */ - public Map hgetall(String key) { - return redisTemplate.opsForHash().entries(key); - } - - // List(列表) - - /** - * 实现命令:LPUSH key value,将一个值 value插入到列表 key的表头 - * - * @param key - * @param value - * @return 执行 LPUSH命令后,列表的长度。 - */ - public long lpush(String key, String value) { - return redisTemplate.opsForList().leftPush(key, value); - } - - /** - * 实现命令:LPOP key,移除并返回列表 key的头元素。 - * - * @param key - * @return 列表key的头元素。 - */ - public String lpop(String key) { - return (String)redisTemplate.opsForList().leftPop(key); - } - - /** - * 实现命令:RPUSH key value,将一个值 value插入到列表 key的表尾(最右边)。 - * - * @param key - * @param value - * @return 执行 LPUSH命令后,列表的长度。 - */ - public long rpush(String key, String value) { - return redisTemplate.opsForList().rightPush(key, value); - } - // List(列表) - /** - * 实现命令:LRANGE key start stop,返回列表key中指定区间内的元素 - * - * @param key Redis key - * @param start 开始索引 - * @param stop 结束索引 - * @return 返回指定区间的元素 - */ - public List lrange(String key, long start, long stop) { - return redisTemplate.opsForList().range(key, start, stop); - } - /** - * 实现命令:LREM key count value,移除列表中与 value 相等的元素 - * - * @param key Redis key - * @param count 删除的数量(正数表示从头部开始删除,负数从尾部,0表示删除全部匹配项) - * @param value 要删除的元素值 - * @return 被删除的元素个数 - */ - public Long lrem(String key, long count, Object value) { - return redisTemplate.opsForList().remove(key, count, value); - } +// +// /** +// * 判断key是否存在 +// * @param key +// * @return +// */ +// public boolean keyIsExist(String key) { +// return redisTemplate.hasKey(key); +// } +// +// /** +// * 实现命令:TTL key,以秒为单位,返回给定 key的剩余生存时间(TTL, time to live)。 +// * +// * @param key +// * @return +// */ +// public long ttl(String key) { +// return redisTemplate.getExpire(key); +// } +// +// /** +// * 实现命令:expire 设置过期时间,单位秒 +// * +// * @param key +// * @return +// */ +// public void expire(String key, long timeout) { +// redisTemplate.expire(key, timeout, TimeUnit.SECONDS); +// } +// +// /** +// * 实现命令:increment key,增加key一次 +// * +// * @param key +// * @return +// */ +// public long increment(String key, long delta) { +// return redisTemplate.opsForValue().increment(key, delta); +// } +// +// /** +// * 累加,使用hash +// */ +// public long incrementHash(String name, String key, long delta) { +// return redisTemplate.opsForHash().increment(name, key, delta); +// } +// +// /** +// * 累减,使用hash +// */ +// public long decrementHash(String name, String key, long delta) { +// delta = delta * (-1); +// return redisTemplate.opsForHash().increment(name, key, delta); +// } +// +// /** +// * hash 设置value +// */ +// public void setHashValue(String name, String key, String value) { +// redisTemplate.opsForHash().put(name, key, value); +// } +// +// /** +// * hash 获得value +// */ +// public String getHashValue(String name, String key) { +// return (String)redisTemplate.opsForHash().get(name, key); +// } +// +// /** +// * 实现命令:decrement key,减少key一次 +// * +// * @param key +// * @return +// */ +// public long decrement(String key, long delta) { +// return redisTemplate.opsForValue().decrement(key, delta); +// } +// +// /** +// * 实现命令:KEYS pattern,查找所有符合给定模式 pattern的 key +// */ +// public Set keys(String pattern) { +// return redisTemplate.keys(pattern); +// } +// +// /** +// * 实现命令:DEL key,删除一个key +// * +// * @param key +// */ +// public void del(String key) { +// redisTemplate.delete(key); +// } +// +// // String(字符串) +// +// /** +// * 实现命令:SET key value,设置一个key-value(将字符串值 value关联到 key) +// * +// * @param key +// * @param value +// */ +// public void set(String key, String value) { +// redisTemplate.opsForValue().set(key, value); +// } +// +// /** +// * 实现命令:SET key value EX seconds,设置key-value和超时时间(秒) +// * +// * @param key +// * @param value +// * @param timeout +// * (以秒为单位) +// */ +// public void set(String key, String value, long timeout) { +// redisTemplate.opsForValue().set(key, value, timeout, TimeUnit.SECONDS); +// } +// +// /** +// * 如果key不存在,则设置,如果存在,则报错 +// * @param key +// * @param value +// */ +// public void setnx60s(String key, String value) { +// redisTemplate.opsForValue().setIfAbsent(key, value, 60, TimeUnit.SECONDS); +// } +// +// /** +// * 如果key不存在,则设置,如果存在,则报错 +// * @param key +// * @param value +// */ +// public void setnx(String key, String value) { +// redisTemplate.opsForValue().setIfAbsent(key, value); +// } +// +// /** +// * 实现命令:GET key,返回 key所关联的字符串值。 +// * +// * @param key +// * @return value +// */ +// public String get(String key) { +// return (String)redisTemplate.opsForValue().get(key); +// } +// +// /** +// * 批量查询,对应mget +// * @param keys +// * @return +// */ +// public List mget(List keys) { +// return redisTemplate.opsForValue().multiGet(keys); +// } +// +// /** +// * 批量查询,管道pipeline +// * @param keys +// * @return +// */ +// public List batchGet(List keys) { +// +//// nginx -> keepalive +//// redis -> pipeline +// +// List result = redisTemplate.executePipelined(new RedisCallback() { +// @Override +// public String doInRedis(RedisConnection connection) throws DataAccessException { +// StringRedisConnection src = (StringRedisConnection)connection; +// +// for (String k : keys) { +// src.get(k); +// } +// return null; +// } +// }); +// +// return result; +// } +// +// +// // Hash(哈希表) +// +// /** +// * 实现命令:HSET key field value,将哈希表 key中的域 field的值设为 value +// * +// * @param key +// * @param field +// * @param value +// */ +// public void hset(String key, String field, Object value) { +// redisTemplate.opsForHash().put(key, field, value); +// } +// +// /** +// * 实现命令:HGET key field,返回哈希表 key中给定域 field的值 +// * +// * @param key +// * @param field +// * @return +// */ +// public String hget(String key, String field) { +// return (String) redisTemplate.opsForHash().get(key, field); +// } +// +// /** +// * 实现命令:HDEL key field [field ...],删除哈希表 key 中的一个或多个指定域,不存在的域将被忽略。 +// * +// * @param key +// * @param fields +// */ +// public void hdel(String key, Object... fields) { +// redisTemplate.opsForHash().delete(key, fields); +// } +// +// /** +// * 实现命令:HGETALL key,返回哈希表 key中,所有的域和值。 +// * +// * @param key +// * @return +// */ +// public Map hgetall(String key) { +// return redisTemplate.opsForHash().entries(key); +// } +// +// // List(列表) +// +// /** +// * 实现命令:LPUSH key value,将一个值 value插入到列表 key的表头 +// * +// * @param key +// * @param value +// * @return 执行 LPUSH命令后,列表的长度。 +// */ +// public long lpush(String key, String value) { +// return redisTemplate.opsForList().leftPush(key, value); +// } +// +// /** +// * 实现命令:LPOP key,移除并返回列表 key的头元素。 +// * +// * @param key +// * @return 列表key的头元素。 +// */ +// public String lpop(String key) { +// return (String)redisTemplate.opsForList().leftPop(key); +// } +// +// /** +// * 实现命令:RPUSH key value,将一个值 value插入到列表 key的表尾(最右边)。 +// * +// * @param key +// * @param value +// * @return 执行 LPUSH命令后,列表的长度。 +// */ +// public long rpush(String key, String value) { +// return redisTemplate.opsForList().rightPush(key, value); +// } +// // List(列表) +// /** +// * 实现命令:LRANGE key start stop,返回列表key中指定区间内的元素 +// * +// * @param key Redis key +// * @param start 开始索引 +// * @param stop 结束索引 +// * @return 返回指定区间的元素 +// */ +// public List lrange(String key, long start, long stop) { +// return redisTemplate.opsForList().range(key, start, stop); +// } +// /** +// * 实现命令:LREM key count value,移除列表中与 value 相等的元素 +// * +// * @param key Redis key +// * @param count 删除的数量(正数表示从头部开始删除,负数从尾部,0表示删除全部匹配项) +// * @param value 要删除的元素值 +// * @return 被删除的元素个数 +// */ +// public Long lrem(String key, long count, Object value) { +// return redisTemplate.opsForList().remove(key, count, value); +// } } diff --git a/ruoyi-modules/ruoyi-content/src/main/resources/mapper/content/VlogMapper.xml b/ruoyi-modules/ruoyi-content/src/main/resources/mapper/content/VlogMapper.xml index b71387915..27789e3d9 100644 --- a/ruoyi-modules/ruoyi-content/src/main/resources/mapper/content/VlogMapper.xml +++ b/ruoyi-modules/ruoyi-content/src/main/resources/mapper/content/VlogMapper.xml @@ -282,5 +282,27 @@ + + + + insert into cms_vlog_member (vlog_id, member_id,status) values (#{vlogId}, #{memberId},1) + diff --git a/ruoyi-modules/ruoyi-content/src/main/resources/mapper/content/VlogMapperCustom.xml b/ruoyi-modules/ruoyi-content/src/main/resources/mapper/content/VlogMapperCustom.xml index d7b50fd50..a1d368870 100644 --- a/ruoyi-modules/ruoyi-content/src/main/resources/mapper/content/VlogMapperCustom.xml +++ b/ruoyi-modules/ruoyi-content/src/main/resources/mapper/content/VlogMapperCustom.xml @@ -58,6 +58,12 @@ #{memberId} + + AND v.id IN + + #{vlogId} + + ORDER BY v.create_time DESC diff --git a/ruoyi-modules/ruoyi-member/src/main/java/com/wzj/soopin/member/service/IMemberService.java b/ruoyi-modules/ruoyi-member/src/main/java/com/wzj/soopin/member/service/IMemberService.java index 0f0bec90a..8c4602470 100644 --- a/ruoyi-modules/ruoyi-member/src/main/java/com/wzj/soopin/member/service/IMemberService.java +++ b/ruoyi-modules/ruoyi-member/src/main/java/com/wzj/soopin/member/service/IMemberService.java @@ -70,4 +70,7 @@ public interface IMemberService extends IService { String updateWechat(MemberBO bo); + + + } diff --git a/ruoyi-modules/ruoyi-order/src/main/java/com/wzj/soopin/order/service/impl/OrderItemServiceImpl.java b/ruoyi-modules/ruoyi-order/src/main/java/com/wzj/soopin/order/service/impl/OrderItemServiceImpl.java index 5b8f70694..fbb22f33f 100644 --- a/ruoyi-modules/ruoyi-order/src/main/java/com/wzj/soopin/order/service/impl/OrderItemServiceImpl.java +++ b/ruoyi-modules/ruoyi-order/src/main/java/com/wzj/soopin/order/service/impl/OrderItemServiceImpl.java @@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.wzj.soopin.order.domain.entity.OrderItem; import com.wzj.soopin.order.mapper.OrderItemMapper; import com.wzj.soopin.order.service.OrderItemService; +import org.dromara.common.redis.redis.RedisCache; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.fasterxml.jackson.databind.ObjectMapper; @@ -15,6 +16,8 @@ import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Map; import java.util.HashMap; +import java.util.concurrent.TimeUnit; + import lombok.extern.slf4j.Slf4j; /** @@ -28,7 +31,7 @@ import lombok.extern.slf4j.Slf4j; public class OrderItemServiceImpl extends ServiceImpl implements OrderItemService { @Autowired - private RedisOperator redis; + private RedisCache redisCache; @Override public List findByOrderId(Long orderId) { @@ -54,7 +57,7 @@ public class OrderItemServiceImpl extends ServiceImpl