[fix]修改视频推送逻辑
This commit is contained in:
parent
e7c0d39162
commit
53c12bcf1b
@ -14,6 +14,7 @@ import io.swagger.annotations.ApiOperation;
|
||||
import io.swagger.annotations.ApiParam;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dromara.common.core.domain.R;
|
||||
import org.dromara.common.redis.redis.RedisCache;
|
||||
import org.dromara.common.satoken.utils.LoginHelper;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
@ -32,6 +33,9 @@ public class AppCommentController {
|
||||
private RedisOperator redis;
|
||||
@Autowired
|
||||
private MsgService msgService;
|
||||
@Autowired
|
||||
private RedisCache redisCache;
|
||||
|
||||
@ApiOperation("查询视频评论列表")
|
||||
@PostMapping("/vlogComments")
|
||||
public R<Page<CommentVO>> queryVlogComments(
|
||||
@ -66,7 +70,7 @@ public class AppCommentController {
|
||||
commentService.createComment(bo);
|
||||
|
||||
// 2) 短视频评论总数 +1(Redis 优先)
|
||||
redis.increment(BaseInfoProperties.REDIS_VLOG_COMMENT_COUNTS + ":" + bo.getVlogId(), 1);
|
||||
redisCache.zSetIncrement(BaseInfoProperties.REDIS_VLOG_COMMENT_COUNTS , bo.getVlogId(), 1);
|
||||
|
||||
// 3) 发送站内消息:根评论 -> 通知视频作者;回复评论 -> 通知被回复用户
|
||||
if ("0".equals(bo.getFatherCommentId())) {
|
||||
|
@ -6,6 +6,7 @@ import com.wzj.soopin.content.domain.bo.*;
|
||||
import com.wzj.soopin.content.domain.po.MyLikedVlog;
|
||||
import com.wzj.soopin.content.domain.po.Vlog;
|
||||
import com.wzj.soopin.content.domain.vo.IndexVlogVO;
|
||||
import com.wzj.soopin.content.service.IVlogPullService;
|
||||
import com.wzj.soopin.content.service.VlogService;
|
||||
import com.wzj.soopin.content.service.VlogUploadService;
|
||||
import com.wzj.soopin.content.utils.PagedGridResult;
|
||||
@ -13,6 +14,7 @@ import com.wzj.soopin.content.utils.QcCloud;
|
||||
import com.wzj.soopin.content.utils.RedisOperator;
|
||||
import io.swagger.annotations.Api;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.dromara.common.core.domain.R;
|
||||
@ -22,6 +24,7 @@ import org.dromara.common.mq.domain.MQMessage;
|
||||
import org.dromara.common.mq.enums.MQMessageType;
|
||||
import org.dromara.common.mq.enums.MessageActionEnum;
|
||||
import org.dromara.common.mq.utils.MqUtil;
|
||||
import org.dromara.common.redis.redis.RedisCache;
|
||||
import org.dromara.common.satoken.utils.LoginHelper;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
@ -38,34 +41,30 @@ import static com.wzj.soopin.content.domain.base.BaseInfoProperties.*;
|
||||
@Api(tags = "VlogController 短视频相关业务功能的接口")
|
||||
@RequestMapping("/app/vlog")
|
||||
@RestController
|
||||
@AllArgsConstructor
|
||||
public class AppVlogController {
|
||||
@Autowired
|
||||
private VlogService vlogService;
|
||||
@Autowired
|
||||
private QcCloud qcCloud;
|
||||
@Autowired
|
||||
private VlogUploadService vlogUploadService;
|
||||
@Autowired
|
||||
private final VlogService vlogService;
|
||||
private final QcCloud qcCloud;
|
||||
private final VlogUploadService vlogUploadService;
|
||||
public final RedisCache cache;
|
||||
|
||||
private final IVlogPullService pullService;
|
||||
|
||||
public RedisOperator redis;
|
||||
@Tag(name = "首页视频列表")
|
||||
@PostMapping("/indexList")
|
||||
public R<Page<IndexVlogVO>> indexList(@RequestBody IndexListBO bo, @RequestBody Page page) {
|
||||
|
||||
try{
|
||||
LoginUser loginUser = LoginHelper.getLoginUser();
|
||||
if (loginUser == null) {
|
||||
throw new ServiceException("用户未登录");
|
||||
}
|
||||
bo.setUserId(String.valueOf(loginUser.getUserId()));
|
||||
}catch (Exception e){
|
||||
log.error("用户没登陆", e);
|
||||
LoginUser loginUser = LoginHelper.getLoginUser();
|
||||
if (loginUser == null) {
|
||||
throw new ServiceException("用户未登录");
|
||||
}
|
||||
Page<IndexVlogVO> pages = vlogService.getIndexVlogList(bo, page);
|
||||
bo.setUserId(String.valueOf(loginUser.getUserId()));
|
||||
Page<IndexVlogVO> pages = pullService.page(page);
|
||||
return R.ok(pages);
|
||||
}
|
||||
|
||||
@GetMapping("/detail/{vlogId}")
|
||||
public R<Object> detail( @PathVariable String vlogId) {
|
||||
public R<Object> detail(@PathVariable String vlogId) {
|
||||
|
||||
return R.ok(vlogService.getVlogDetailById(vlogId));
|
||||
}
|
||||
@ -99,7 +98,7 @@ public class AppVlogController {
|
||||
throw new ServiceException("用户未登录");
|
||||
}
|
||||
bo.setMyId(String.valueOf(loginUser.getUserId()));
|
||||
Page<IndexVlogVO> pages = vlogService.getMyFollowVlogList( page);
|
||||
Page<IndexVlogVO> pages = vlogService.getMyFollowVlogList(page);
|
||||
return R.ok(pages);
|
||||
}
|
||||
|
||||
@ -111,9 +110,10 @@ public class AppVlogController {
|
||||
throw new ServiceException("用户未登录");
|
||||
}
|
||||
bo.setMyId(String.valueOf(loginUser.getUserId()));
|
||||
Page<IndexVlogVO> pages = vlogService.getMyFriendVlogList( page);
|
||||
Page<IndexVlogVO> pages = vlogService.getMyFriendVlogList(page);
|
||||
return R.ok(pages);
|
||||
}
|
||||
|
||||
@PostMapping("publish")
|
||||
public R<Void> publish(@RequestBody VlogBO vlogBO) throws Exception {
|
||||
|
||||
@ -126,12 +126,12 @@ public class AppVlogController {
|
||||
|
||||
|
||||
String url = vlogBO.getUrl();
|
||||
log.info("未审核视频地址:"+url);
|
||||
log.info("未审核视频地址:" + url);
|
||||
String fileName = url.substring(url.lastIndexOf("/") + 1);
|
||||
log.info("视频文件名称:"+fileName);
|
||||
log.info("开始上传腾讯云点播:"+fileName);
|
||||
log.info("视频文件名称:" + fileName);
|
||||
log.info("开始上传腾讯云点播:" + fileName);
|
||||
String fileId = qcCloud.uploadViaTempFile(fileName);
|
||||
log.info("视频发布ID:"+fileId);
|
||||
log.info("视频发布ID:" + fileId);
|
||||
vlogBO.setFileId(fileId);
|
||||
// 删除minio文件
|
||||
// MinIOUtils.removeFile(minIOConfig.getBucketName(),fileName);
|
||||
@ -142,6 +142,7 @@ public class AppVlogController {
|
||||
|
||||
return R.ok();
|
||||
}
|
||||
|
||||
@Tag(name = "我的公开视频列表")
|
||||
@PostMapping("/myPublicList")
|
||||
public R<Page<IndexVlogVO>> myPublicList(@RequestBody MyListBO bo, @RequestBody Page page) {
|
||||
@ -149,155 +150,50 @@ public class AppVlogController {
|
||||
return R.ok(pages);
|
||||
}
|
||||
|
||||
|
||||
|
||||
private Integer nacosConuts=0;
|
||||
|
||||
|
||||
@PostMapping("like")
|
||||
@PostMapping("/like")
|
||||
public R<Void> like(@RequestBody VlogBO vlogBO) {
|
||||
LoginUser loginUser = LoginHelper.getLoginUser();
|
||||
if (loginUser == null) {
|
||||
throw new ServiceException("用户未登录");
|
||||
}
|
||||
|
||||
String userId = String.valueOf(loginUser.getUserId());
|
||||
String vlogId = vlogBO.getId();
|
||||
|
||||
|
||||
//获取vlog
|
||||
Vlog vlog = vlogService.getVlog(vlogId);
|
||||
if(vlog==null){
|
||||
throw new ServiceException("视频不存在");
|
||||
}
|
||||
// 我点赞的视频,关联关系保存到数据库
|
||||
vlogService.userLikeVlog(userId, vlogId);
|
||||
|
||||
// 点赞后,视频和视频发布者的获赞都会 +1
|
||||
redis.increment(REDIS_VLOGER_BE_LIKED_COUNTS + ":" + vlog.getMemberId(), 1);
|
||||
redis.increment(REDIS_VLOG_BE_LIKED_COUNTS + ":" + vlogId, 1);
|
||||
|
||||
// 我点赞的视频,需要在redis中保存关联关系
|
||||
redis.set(REDIS_USER_LIKE_VLOG + ":" + userId + ":" + vlogId, "1");
|
||||
|
||||
log.info("nacosConuts="+nacosConuts);
|
||||
|
||||
String countsStr = redis.get(REDIS_VLOG_BE_LIKED_COUNTS + ":" + vlogId);
|
||||
Integer counts=0;
|
||||
if (StringUtils.isNotBlank(countsStr)){
|
||||
|
||||
counts=Integer.valueOf(countsStr);
|
||||
if (counts>=nacosConuts){
|
||||
|
||||
vlogService.flushCounts(vlogId, counts);
|
||||
}
|
||||
}
|
||||
if (userId != null && vlog.getMemberId() != null && !userId.equals(vlog.getMemberId())) {
|
||||
// 新版:使用模板类型编号和参数
|
||||
Map<String, Object> params = new HashMap<>();
|
||||
params.put("userId", userId);
|
||||
params.put("nickname", loginUser.getNickname());
|
||||
params.put("action", MessageActionEnum.INTERACTION_LIKE.name());
|
||||
params.put("toUserId",vlog.getMemberId());
|
||||
MQMessage message = MQMessage.builder()
|
||||
.messageType(MQMessageType.IM.name())
|
||||
.data(params)
|
||||
.source("member")
|
||||
.build();
|
||||
// 关注消息
|
||||
MqUtil.sendIMMessage(message);
|
||||
|
||||
}
|
||||
|
||||
return R.ok();
|
||||
}
|
||||
|
||||
|
||||
@PostMapping("unlike")
|
||||
|
||||
@PostMapping("/unlike")
|
||||
public R<Void> unlike(@RequestBody Map<String, String> params) {
|
||||
LoginUser loginUser = LoginHelper.getLoginUser();
|
||||
if (loginUser == null) {
|
||||
throw new ServiceException("用户未登录");
|
||||
}
|
||||
|
||||
String userId = String.valueOf(loginUser.getUserId());
|
||||
String vlogId = params.get("vlogId");
|
||||
|
||||
//获取vlog
|
||||
Vlog vlog = vlogService.getVlog(vlogId);
|
||||
if(vlog==null){
|
||||
throw new ServiceException("视频不存在");
|
||||
}
|
||||
|
||||
// 我取消点赞的视频,关联关系删除
|
||||
vlogService.userUnLikeVlog(userId, vlogId);
|
||||
|
||||
redis.decrement(REDIS_VLOGER_BE_LIKED_COUNTS + ":" + vlog.getMemberId(), 1);
|
||||
redis.decrement(REDIS_VLOG_BE_LIKED_COUNTS + ":" + vlogId, 1);
|
||||
redis.del(REDIS_USER_LIKE_VLOG + ":" + userId + ":" + vlogId);
|
||||
|
||||
return R.ok();
|
||||
}
|
||||
@GetMapping("/read/{vlogId}")
|
||||
public R<Void> read(@PathVariable String vlogId) {
|
||||
LoginUser loginUser = LoginHelper.getLoginUser();
|
||||
if (loginUser == null) {
|
||||
throw new ServiceException("用户未登录");
|
||||
}
|
||||
vlogService.readVlog(loginUser.getUserId(), vlogId);
|
||||
return R.ok();
|
||||
}
|
||||
|
||||
@Tag(name = "手动触发缓存点赞最多视频")
|
||||
@PostMapping("/cacheTopLikedVlogs")
|
||||
public R<Void> cacheTopLikedVlogs(@RequestParam(defaultValue = "100") int limit) {
|
||||
try {
|
||||
vlogService.cacheTopLikedVlogs(limit);
|
||||
// vlogService.cacheTopLikedVlogs(limit);
|
||||
return R.ok();
|
||||
} catch (Exception e) {
|
||||
log.error("手动触发缓存点赞最多视频失败", e);
|
||||
return R.fail("缓存失败: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Tag(name = "获取缓存中的点赞最多视频")
|
||||
@GetMapping("/getTopLikedVlogs")
|
||||
public R<Object> getTopLikedVlogs(@RequestParam(defaultValue = "") String date,
|
||||
@RequestParam(defaultValue = "10") int pageSize,
|
||||
@RequestParam(defaultValue = "0") int pageNum) {
|
||||
try {
|
||||
String redisKey;
|
||||
if (StringUtils.isBlank(date)) {
|
||||
// 如果没有指定日期,使用今天的日期
|
||||
redisKey = "top_liked_vlogs:" + java.time.LocalDateTime.now().format(java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd"));
|
||||
} else {
|
||||
redisKey = "top_liked_vlogs:" + date;
|
||||
}
|
||||
|
||||
String cachedData = redis.get(redisKey);
|
||||
List<Map<String, Object>> resultList = new ArrayList<>();
|
||||
|
||||
if (StringUtils.isNotBlank(cachedData)) {
|
||||
// 解析JSON数据
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
List<Map<String, Object>> vlogList = objectMapper.readValue(cachedData, new com.fasterxml.jackson.core.type.TypeReference<List<Map<String, Object>>>() {});
|
||||
|
||||
// 计算分页
|
||||
int startIndex = pageNum * pageSize;
|
||||
int endIndex = Math.min(startIndex + pageSize, vlogList.size());
|
||||
|
||||
if (startIndex < vlogList.size()) {
|
||||
// 从Redis缓存中获取指定页的数据
|
||||
resultList = vlogList.subList(startIndex, endIndex);
|
||||
log.info("从Redis缓存中获取了{}条视频数据", resultList.size());
|
||||
}
|
||||
}
|
||||
|
||||
// 如果Redis中的数据不足,从数据库随机查询补充
|
||||
if (resultList.size() < pageSize) {
|
||||
int needMore = pageSize - resultList.size();
|
||||
log.info("Redis缓存数据不足,需要从数据库随机查询{}条视频", needMore);
|
||||
|
||||
// 从数据库随机查询视频
|
||||
List<Map<String, Object>> randomVlogs = vlogService.getRandomVlogs(needMore);
|
||||
resultList.addAll(randomVlogs);
|
||||
log.info("从数据库随机查询了{}条视频", randomVlogs.size());
|
||||
}
|
||||
|
||||
return R.ok(resultList);
|
||||
} catch (Exception e) {
|
||||
log.error("获取缓存中的点赞最多视频失败", e);
|
||||
return R.fail("获取缓存失败: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -36,4 +36,9 @@ public interface GlobalConstants {
|
||||
* 会员 redis key
|
||||
*/
|
||||
String MEMBER_KEY = GLOBAL_REDIS_KEY + "member:";
|
||||
|
||||
/**
|
||||
* 视频 redis key
|
||||
*/
|
||||
String VLOG_KEY = GLOBAL_REDIS_KEY + "vlog:";
|
||||
}
|
||||
|
@ -48,6 +48,10 @@
|
||||
<version>2.15.2</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-client</artifactId>
|
||||
<version>5.3.0</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
@ -1,8 +1,13 @@
|
||||
package org.dromara.common.mq.utils;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
|
||||
import org.apache.rocketmq.client.consumer.LitePullConsumer;
|
||||
import org.apache.rocketmq.client.producer.SendCallback;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.dromara.common.json.utils.JsonUtils;
|
||||
import org.dromara.common.mq.config.RocketMQConfig;
|
||||
@ -13,18 +18,40 @@ import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class MqUtil implements ApplicationContextAware {
|
||||
private static final String NAME_SERVER_ADDR = "localhost:9876";
|
||||
private static final String CONSUMER_GROUP = "TEST_PULL_GROUP";
|
||||
private static final String TOPIC = "TEST_TOPIC";
|
||||
private static RocketMQTemplate template;
|
||||
|
||||
private static Map<String, LitePullConsumer> consumerMap = new ConcurrentHashMap<>();
|
||||
@Override
|
||||
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
||||
template = applicationContext.getBean(RocketMQTemplate.class);
|
||||
}
|
||||
|
||||
public void init() throws Exception {
|
||||
// 1. 创建消费者实例,指定消费者组
|
||||
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("YOUR_PULL_CONSUMER_GROUP");
|
||||
|
||||
consumer.setNamesrvAddr("localhost:9876"); // 替换为您的NameServer地址
|
||||
consumer.subscribe("YOUR_TOPIC", "*");
|
||||
consumer.start();
|
||||
// 3. 设置从何处开始消费(首次启动时)
|
||||
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
|
||||
// 4. 订阅主题和Tag
|
||||
consumer.subscribe("YOUR_TOPIC", "*"); // 使用*消费所有Tag
|
||||
// 5. 启动消费者
|
||||
consumer.start();
|
||||
System.out.println("LitePullConsumer 启动成功");
|
||||
}
|
||||
|
||||
//
|
||||
public static void sendMessage(String topic, MQMessage message) {
|
||||
public static void sendMessage(String destination, MQMessage message) {
|
||||
if (template == null) {
|
||||
log.error("RocketMQTemplate未初始化,无法发送消息");
|
||||
return;
|
||||
@ -32,10 +59,10 @@ public class MqUtil implements ApplicationContextAware {
|
||||
|
||||
try {
|
||||
String jsonMessage = JsonUtils.toJsonString(message);
|
||||
template.convertAndSend(topic, jsonMessage);
|
||||
log.info("发送消息到RocketMQ成功,topic: {}, message: {}", topic, jsonMessage);
|
||||
template.convertAndSend(destination, jsonMessage);
|
||||
log.info("发送消息到RocketMQ成功,topic: {}, message: {}", destination, jsonMessage);
|
||||
} catch (Exception e) {
|
||||
log.error("发送消息到RocketMQ失败,topic: {}, message: {}", topic, message, e);
|
||||
log.error("发送消息到RocketMQ失败,topic: {}, message: {}", destination, message, e);
|
||||
}
|
||||
}
|
||||
public static void sendIMMessage( MQMessage message) {
|
||||
@ -97,5 +124,4 @@ public class MqUtil implements ApplicationContextAware {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,333 @@
|
||||
package org.dromara.common.mq.utils;
|
||||
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
|
||||
import org.apache.rocketmq.client.consumer.PullResult;
|
||||
import org.apache.rocketmq.client.consumer.PullStatus;
|
||||
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
|
||||
import org.apache.rocketmq.client.exception.MQBrokerException;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.common.message.MessageQueue;
|
||||
import org.apache.rocketmq.remoting.exception.RemotingException;
|
||||
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 带超时自动销毁功能的用户级RocketMQ消费者管理器
|
||||
* 支持按用户隔离消费者,并自动清理超时未活动的实例
|
||||
*/
|
||||
@Component
|
||||
public class UserRocketMQConsumerManager {
|
||||
// 全局NameServer地址
|
||||
@Value("rocketmq.name-server")
|
||||
|
||||
|
||||
private String namesrvAddr="192.168.1.65:9876";
|
||||
|
||||
// 超时时间(毫秒),默认30分钟
|
||||
private long timeoutMillis = 30 * 60 * 1000;
|
||||
|
||||
// 定时检查间隔(毫秒),默认5分钟
|
||||
private long checkIntervalMillis = 5 * 60 * 1000;
|
||||
|
||||
// 存储用户与消费者的映射(线程安全)
|
||||
private final Map<String, UserConsumerContext> userConsumerMap = new ConcurrentHashMap<>();
|
||||
|
||||
// 存储每个用户的消费偏移量(用户ID -> 队列 -> 偏移量)
|
||||
private final Map<String, Map<MessageQueue, Long>> userOffsetMap = new ConcurrentHashMap<>();
|
||||
|
||||
// 定时任务执行器,用于检测超时消费者
|
||||
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
private final String consumerGroup="user_consumer_group";
|
||||
private final String topic="MEMBER_VLOG_MSG";
|
||||
/**
|
||||
* 初始化管理器
|
||||
*/
|
||||
public UserRocketMQConsumerManager() {
|
||||
if (namesrvAddr == null || namesrvAddr.trim().isEmpty()) {
|
||||
throw new IllegalArgumentException("NameServer地址不能为空");
|
||||
}
|
||||
System.setProperty("rocketmq.namesrv.addr", namesrvAddr);
|
||||
// 启动定时清理任务
|
||||
startTimeoutChecker();
|
||||
}
|
||||
|
||||
/**
|
||||
* 启动超时检查定时任务
|
||||
*/
|
||||
private void startTimeoutChecker() {
|
||||
scheduler.scheduleAtFixedRate(() -> {
|
||||
try {
|
||||
checkAndDestroyTimeoutConsumers();
|
||||
} catch (Exception e) {
|
||||
System.err.println("超时检查任务执行失败: " + e.getMessage());
|
||||
}
|
||||
}, checkIntervalMillis, checkIntervalMillis, TimeUnit.MILLISECONDS);
|
||||
System.out.println("超时检查任务已启动,检查间隔: " + checkIntervalMillis + "ms,超时时间: " + timeoutMillis + "ms");
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查并销毁超时未活动的消费者
|
||||
*/
|
||||
private void checkAndDestroyTimeoutConsumers() {
|
||||
long currentTime = System.currentTimeMillis();
|
||||
List<String> timeoutUserIds = new ArrayList<>();
|
||||
|
||||
// 收集超时用户
|
||||
for (Map.Entry<String, UserConsumerContext> entry : userConsumerMap.entrySet()) {
|
||||
String userId = entry.getKey();
|
||||
UserConsumerContext context = entry.getValue();
|
||||
|
||||
if (currentTime - context.lastActiveTime > timeoutMillis) {
|
||||
timeoutUserIds.add(userId);
|
||||
}
|
||||
}
|
||||
|
||||
// 销毁超时消费者
|
||||
for (String userId : timeoutUserIds) {
|
||||
System.out.println("用户[" + userId + "]的消费者已超时未活动,自动销毁");
|
||||
destroyConsumer(userId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 为指定用户创建消费者实例
|
||||
* @param userId 用户唯一标识
|
||||
* @param consumerGroup 该用户的消费组
|
||||
* @param topic 消费的主题
|
||||
* @return 是否创建成功
|
||||
*/
|
||||
public boolean createConsumer(String userId, String consumerGroup, String topic) throws MQClientException {
|
||||
validateUserId(userId);
|
||||
|
||||
// 若用户已存在消费者,更新活动时间并返回
|
||||
if (userConsumerMap.containsKey(userId)) {
|
||||
UserConsumerContext context = userConsumerMap.get(userId);
|
||||
context.lastActiveTime = System.currentTimeMillis();
|
||||
System.out.println("用户[" + userId + "]的消费者已存在,更新活动时间");
|
||||
return false;
|
||||
}
|
||||
|
||||
// 创建消费者
|
||||
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerGroup);
|
||||
consumer.setMessageModel(MessageModel.CLUSTERING);
|
||||
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
|
||||
consumer.start();
|
||||
|
||||
// 初始化该用户的偏移量存储
|
||||
userOffsetMap.putIfAbsent(userId, new ConcurrentHashMap<>());
|
||||
|
||||
// 存储用户消费者上下文
|
||||
UserConsumerContext context = new UserConsumerContext(
|
||||
consumer,
|
||||
consumerGroup,
|
||||
topic,
|
||||
System.currentTimeMillis()
|
||||
);
|
||||
userConsumerMap.put(userId, context);
|
||||
System.out.println("用户[" + userId + "]的消费者创建成功,消费组: " + consumerGroup);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 为指定用户拉取消息(会更新活动时间)
|
||||
* @param userId 用户唯一标识
|
||||
* @param batchSize 每次拉取数量
|
||||
* @return 拉取到的消息列表
|
||||
*/
|
||||
public List<MessageExt> pullUserMessages(String userId , int batchSize)
|
||||
throws MQClientException, InterruptedException, MQBrokerException, RemotingException {
|
||||
validateUserId(userId);
|
||||
UserConsumerContext context = getUserConsumerContext(userId);
|
||||
|
||||
// 更新最后活动时间
|
||||
context.lastActiveTime = System.currentTimeMillis();
|
||||
|
||||
// 获取用户的消息队列
|
||||
Set<MessageQueue> mqs = context.consumer.fetchSubscribeMessageQueues(context.topic);
|
||||
List<MessageExt> allMessages = new ArrayList<>();
|
||||
|
||||
// 拉取所有队列的消息
|
||||
for (MessageQueue mq : mqs) {
|
||||
PullResult result = pullSingleQueueMessage(userId, context.consumer, mq , batchSize);
|
||||
if (result.getPullStatus() == PullStatus.FOUND) {
|
||||
allMessages.addAll(result.getMsgFoundList());
|
||||
// 更新用户的偏移量
|
||||
updateUserOffset(userId, mq, result.getNextBeginOffset());
|
||||
}
|
||||
}
|
||||
|
||||
return allMessages;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取指定用户的消费者上下文
|
||||
*/
|
||||
private UserConsumerContext getUserConsumerContext(String userId) throws MQClientException {
|
||||
UserConsumerContext context = userConsumerMap.get(userId);
|
||||
if (context == null) {
|
||||
createConsumer(userId, consumerGroup,topic);
|
||||
}
|
||||
return userConsumerMap.get(userId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 拉取单个队列的消息
|
||||
*/
|
||||
private PullResult pullSingleQueueMessage(String userId, DefaultMQPullConsumer consumer,
|
||||
MessageQueue mq, int batchSize)
|
||||
throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
|
||||
// 获取该用户在该队列的偏移量
|
||||
long offset = getUserQueueOffset(userId, mq);
|
||||
return consumer.pull(mq, userId, offset, batchSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取用户在指定队列的消费偏移量
|
||||
*/
|
||||
private long getUserQueueOffset(String userId, MessageQueue mq) {
|
||||
Map<MessageQueue, Long> queueOffsetMap = userOffsetMap.get(userId);
|
||||
return queueOffsetMap.getOrDefault(mq, 0L);
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新用户在指定队列的消费偏移量
|
||||
*/
|
||||
private void updateUserOffset(String userId, MessageQueue mq, long offset) {
|
||||
Map<MessageQueue, Long> queueOffsetMap = userOffsetMap.get(userId);
|
||||
queueOffsetMap.put(mq, offset);
|
||||
}
|
||||
|
||||
/**
|
||||
* 手动销毁指定用户的消费者
|
||||
* @param userId 用户唯一标识
|
||||
* @return 是否销毁成功
|
||||
*/
|
||||
public boolean destroyConsumer(String userId) {
|
||||
validateUserId(userId);
|
||||
UserConsumerContext context = userConsumerMap.remove(userId);
|
||||
if (context != null) {
|
||||
context.consumer.shutdown();
|
||||
userOffsetMap.remove(userId);
|
||||
System.out.println("用户[" + userId + "]的消费者已销毁,消费组: " + context.consumerGroup);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取当前所有用户的消费者列表
|
||||
*/
|
||||
public Set<String> getRegisteredUserIds() {
|
||||
return userConsumerMap.keySet();
|
||||
}
|
||||
|
||||
/**
|
||||
* 校验用户ID
|
||||
*/
|
||||
private void validateUserId(String userId) {
|
||||
if (userId == null || userId.trim().isEmpty()) {
|
||||
throw new IllegalArgumentException("用户ID不能为空");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置超时时间(毫秒)
|
||||
*/
|
||||
public void setTimeoutMillis(long timeoutMillis) {
|
||||
if (timeoutMillis <= 0) {
|
||||
throw new IllegalArgumentException("超时时间必须大于0");
|
||||
}
|
||||
this.timeoutMillis = timeoutMillis;
|
||||
System.out.println("已更新超时时间: " + timeoutMillis + "ms");
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置检查间隔(毫秒)
|
||||
*/
|
||||
public void setCheckIntervalMillis(long checkIntervalMillis) {
|
||||
if (checkIntervalMillis <= 0 || checkIntervalMillis > timeoutMillis) {
|
||||
throw new IllegalArgumentException("检查间隔必须大于0且小于等于超时时间");
|
||||
}
|
||||
this.checkIntervalMillis = checkIntervalMillis;
|
||||
// 重新启动定时任务
|
||||
scheduler.shutdownNow();
|
||||
startTimeoutChecker();
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭管理器,释放所有资源
|
||||
*/
|
||||
public void shutdown() {
|
||||
scheduler.shutdown();
|
||||
// 销毁所有消费者
|
||||
for (String userId : new ArrayList<>(userConsumerMap.keySet())) {
|
||||
destroyConsumer(userId);
|
||||
}
|
||||
System.out.println("消费者管理器已关闭");
|
||||
}
|
||||
|
||||
/**
|
||||
* 用户消费者上下文(存储消费者实例及活动时间)
|
||||
*/
|
||||
private static class UserConsumerContext {
|
||||
DefaultMQPullConsumer consumer;
|
||||
String consumerGroup;
|
||||
String topic;
|
||||
long createTime; // 创建时间
|
||||
long lastActiveTime; // 最后活动时间(拉取消息时间)
|
||||
|
||||
public UserConsumerContext(DefaultMQPullConsumer consumer, String consumerGroup,
|
||||
String topic, long createTime) {
|
||||
this.consumer = consumer;
|
||||
this.consumerGroup = consumerGroup;
|
||||
this.topic = topic;
|
||||
this.createTime = createTime;
|
||||
this.lastActiveTime = createTime; // 初始活动时间=创建时间
|
||||
}
|
||||
}
|
||||
|
||||
// 测试示例
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
try {
|
||||
// 初始化管理器(设置超时时间为10秒,检查间隔为5秒)
|
||||
UserRocketMQConsumerManager manager = new UserRocketMQConsumerManager( );
|
||||
manager.setTimeoutMillis(10 * 1000);
|
||||
manager.setCheckIntervalMillis(5 * 1000);
|
||||
|
||||
// 为用户1创建消费者
|
||||
manager.createConsumer("user1", "user1_group", "user_topic");
|
||||
System.out.println("当前注册用户: " + manager.getRegisteredUserIds());
|
||||
|
||||
// 用户1拉取消息(更新活动时间)
|
||||
System.out.println("\n用户1首次拉取消息:");
|
||||
manager.pullUserMessages("user1", 5);
|
||||
|
||||
// 等待6秒(未超时)
|
||||
Thread.sleep(6000);
|
||||
System.out.println("\n6秒后注册用户: " + manager.getRegisteredUserIds());
|
||||
|
||||
// 再等待5秒(累计11秒,超过10秒超时)
|
||||
Thread.sleep(5000);
|
||||
System.out.println("\n11秒后注册用户: " + manager.getRegisteredUserIds());
|
||||
|
||||
// 关闭管理器
|
||||
manager.shutdown();
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.core.ValueOperations;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.sql.ShardingKey;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@ -121,7 +122,7 @@ public class RedisCache
|
||||
* 获得缓存的list对象
|
||||
*
|
||||
* @param key 缓存的键值
|
||||
* @return 缓存键值对应的数据
|
||||
* * @return 缓存键值对应的数据
|
||||
*/
|
||||
public <T> List<T> getCacheList(final String key)
|
||||
{
|
||||
@ -230,4 +231,97 @@ public class RedisCache
|
||||
}
|
||||
|
||||
|
||||
//zeset类型的增删改查
|
||||
/**
|
||||
* 新增zset
|
||||
* @param key
|
||||
* @param value
|
||||
* @param score
|
||||
*/
|
||||
public void zSetAdd(String key, String value, double score) {
|
||||
redisTemplate.opsForZSet().add(key, value, score);
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询zset
|
||||
* @param key
|
||||
* @param start
|
||||
* @param end
|
||||
* @return
|
||||
*/
|
||||
public Set<String> zSetRange(String key, long start, long end) {
|
||||
return redisTemplate.opsForZSet().range(key, start, end);
|
||||
}
|
||||
/**
|
||||
* 删除zset
|
||||
* @param key
|
||||
* @param value
|
||||
*/
|
||||
public void zSetRemove(String key, String value) {
|
||||
redisTemplate.opsForZSet().remove(key, value);
|
||||
}
|
||||
/**
|
||||
* 查询zset分数
|
||||
* @param key
|
||||
* @param value
|
||||
* @return
|
||||
*/
|
||||
public Double zSetScore(String key, String value) {
|
||||
return redisTemplate.opsForZSet().score(key, value);
|
||||
}
|
||||
/**
|
||||
* 查询zset长度
|
||||
* @param key
|
||||
* @return
|
||||
*/
|
||||
public Long zSetSize(String key) {
|
||||
return redisTemplate.opsForZSet().size(key);
|
||||
}
|
||||
/**
|
||||
* 查询zset排名
|
||||
* @param key
|
||||
* @param value
|
||||
* @return
|
||||
*/
|
||||
public Long zSetRank(String key, String value) {
|
||||
return redisTemplate.opsForZSet().rank(key, value);
|
||||
}
|
||||
/**
|
||||
* 排行榜
|
||||
* @param key
|
||||
* @param start
|
||||
* @param end
|
||||
* @return
|
||||
*/
|
||||
public Set<String> zSetReverseRange(String key, long start, long end) {
|
||||
return redisTemplate.opsForZSet().reverseRange(key, start, end);
|
||||
}
|
||||
/**
|
||||
* 增加增量
|
||||
* @param key
|
||||
* @param value
|
||||
* @param delta
|
||||
*/
|
||||
public void zSetIncrement(String key, String value, double delta) {
|
||||
redisTemplate.opsForZSet().incrementScore(key, value, delta);
|
||||
}
|
||||
/**
|
||||
* 减少数量
|
||||
* @param key
|
||||
* @param value
|
||||
* @param delta
|
||||
*/
|
||||
public void zSetDecrement(String key, String value, double delta) {
|
||||
redisTemplate.opsForZSet().incrementScore(key, value, -delta);
|
||||
}
|
||||
|
||||
/**
|
||||
* 是否有这个成员
|
||||
* @param key
|
||||
* @param value
|
||||
* @return
|
||||
*/
|
||||
public boolean zSetHasMember(String key, String value) {
|
||||
return redisTemplate.opsForZSet().rank(key, value) != null;
|
||||
}
|
||||
}
|
||||
|
@ -7,10 +7,7 @@ import org.redisson.api.*;
|
||||
import org.redisson.api.options.KeysScanOptions;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
@ -581,4 +578,6 @@ public class RedisUtils {
|
||||
RKeys rKeys = CLIENT.getKeys();
|
||||
return rKeys.countExists(key) > 0;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ import java.util.List;
|
||||
*/
|
||||
@Tag(name = "文章分类管理")
|
||||
@RestController
|
||||
@RequestMapping("/content/article/category")
|
||||
@RequestMapping("/cms/article/category")
|
||||
@RequiredArgsConstructor
|
||||
public class ArticleCategoryController {
|
||||
|
||||
|
@ -27,7 +27,7 @@ import java.util.List;
|
||||
*/
|
||||
@Tag(name = "内容管理")
|
||||
@RestController
|
||||
@RequestMapping("/content/article")
|
||||
@RequestMapping("/cms/article")
|
||||
@RequiredArgsConstructor
|
||||
public class ArticleController {
|
||||
|
||||
|
@ -5,6 +5,7 @@ package com.wzj.soopin.content.controller;
|
||||
import com.wzj.soopin.content.domain.base.BaseInfoProperties;
|
||||
import com.wzj.soopin.content.domain.bo.VlogBO;
|
||||
import com.wzj.soopin.content.enums.YesOrNo;
|
||||
import com.wzj.soopin.content.service.IVlogPushService;
|
||||
import org.dromara.common.core.domain.R;
|
||||
import com.wzj.soopin.content.service.VlogService;
|
||||
import com.wzj.soopin.content.service.VlogUploadService;
|
||||
@ -33,7 +34,7 @@ import java.util.ArrayList;
|
||||
|
||||
@Slf4j
|
||||
@Api(tags = "VlogController 短视频相关业务功能的接口")
|
||||
@RequestMapping("/vlog")
|
||||
@RequestMapping("/cms/vlog")
|
||||
@RestController
|
||||
|
||||
public class VlogController extends BaseInfoProperties {
|
||||
@ -45,6 +46,8 @@ public class VlogController extends BaseInfoProperties {
|
||||
private VlogUploadService vlogUploadService;
|
||||
@Autowired
|
||||
public RedisOperator redis;
|
||||
@Autowired
|
||||
private IVlogPushService vlogPushService;
|
||||
|
||||
@PostMapping("vodCallBack")
|
||||
public R<Void> vodCallBack(@RequestBody Map<String, Object> callbackData) {
|
||||
@ -194,7 +197,13 @@ public class VlogController extends BaseInfoProperties {
|
||||
public R<Integer> totalLikedCounts(@RequestParam String vlogId) {
|
||||
return R.ok(vlogService.getVlogBeLikedCounts(vlogId));
|
||||
}
|
||||
|
||||
|
||||
@GetMapping("/push/hot")
|
||||
public R pushHot(@RequestParam int count) {
|
||||
return R.ok(vlogPushService.cacheTopLikedVlogs(count));
|
||||
}
|
||||
@GetMapping("/push/member")
|
||||
public R pushMember(@RequestParam int count,@RequestParam Long memberId) {
|
||||
return R.ok(vlogPushService.pushVlogToMember(count,memberId));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -33,7 +33,7 @@ import com.wzj.soopin.content.enums.MessageEnum;
|
||||
|
||||
@Slf4j
|
||||
@Api(tags = "管理端-评论管理接口")
|
||||
@RequestMapping("/comment")
|
||||
@RequestMapping("/cms/comment")
|
||||
@RestController
|
||||
public class CommentController extends BaseController {
|
||||
|
||||
|
@ -24,7 +24,7 @@ import java.util.*;
|
||||
|
||||
@Slf4j
|
||||
@Api(tags = "管理端-视频")
|
||||
@RequestMapping("/video")
|
||||
@RequestMapping("/cms/video")
|
||||
@RestController
|
||||
public class VlogUploadController extends BaseInfoProperties {
|
||||
|
||||
|
@ -168,4 +168,8 @@ public interface VlogMapper extends BaseMapper<Vlog> {
|
||||
"ORDER BY RAND() " +
|
||||
"LIMIT #{limit}")
|
||||
List<Map<String, Object>> selectRandomVlogs(@Param("limit") int limit);
|
||||
|
||||
|
||||
IPage<String> getVlogForUser(Page<IndexVlogVO> page, @Param("memberId") Long memberId);
|
||||
int readVlog(@Param("memberId") Long memberId, @Param("vlogId") String vlogId);
|
||||
}
|
||||
|
@ -0,0 +1,12 @@
|
||||
package com.wzj.soopin.content.service;
|
||||
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.wzj.soopin.content.domain.vo.IndexVlogVO;
|
||||
|
||||
public interface IVlogPullService {
|
||||
|
||||
/**
|
||||
* 从视频库中拉取视频
|
||||
*/
|
||||
Page<IndexVlogVO> page(Page<IndexVlogVO> page);
|
||||
}
|
@ -0,0 +1,12 @@
|
||||
package com.wzj.soopin.content.service;
|
||||
|
||||
/**
|
||||
* 视频推送服务
|
||||
* @author wqx
|
||||
* @date 2023-12-20
|
||||
*/
|
||||
public interface IVlogPushService {
|
||||
boolean cacheTopLikedVlogs(int limit) ;
|
||||
|
||||
boolean pushVlogToMember(int count,Long memberId);
|
||||
}
|
@ -2,6 +2,7 @@ package com.wzj.soopin.content.service;
|
||||
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.baomidou.mybatisplus.extension.service.IService;
|
||||
import com.wzj.soopin.content.domain.bo.*;
|
||||
import com.wzj.soopin.content.domain.po.Vlog;
|
||||
import com.wzj.soopin.content.domain.vo.IndexVlogVO;
|
||||
@ -10,7 +11,7 @@ import com.wzj.soopin.content.utils.PagedGridResult;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public interface VlogService {
|
||||
public interface VlogService extends IService<Vlog> {
|
||||
/**
|
||||
* 修改视频首帧图
|
||||
*/
|
||||
@ -25,10 +26,6 @@ public interface VlogService {
|
||||
*/
|
||||
public void createVlog(VlogBO vlogBO);
|
||||
|
||||
/**
|
||||
* 查询首页/搜索的vlog列表
|
||||
*/
|
||||
public Page<IndexVlogVO> getIndexVlogList(IndexListBO bo, Page page);
|
||||
|
||||
/**
|
||||
* 根据视频主键查询vlog
|
||||
@ -115,10 +112,6 @@ public interface VlogService {
|
||||
*/
|
||||
List<Map<String, Object>> getLikedUsers(String vlogId);
|
||||
|
||||
/**
|
||||
* 获取视频点赞数
|
||||
*/
|
||||
int getLikeCounts(String vlogId);
|
||||
|
||||
/**
|
||||
* 获取视频上传者信息
|
||||
@ -135,11 +128,6 @@ public interface VlogService {
|
||||
*/
|
||||
IPage<Map<String, Object>> getVlogListByMobile(Page<Map<String, Object>> page, VlogBO vlogBO);
|
||||
|
||||
/**
|
||||
* 查询点赞最多的视频列表并存储到Redis
|
||||
* @param limit 查询数量限制
|
||||
*/
|
||||
void cacheTopLikedVlogs(int limit);
|
||||
|
||||
/**
|
||||
* 随机查询视频列表
|
||||
@ -147,4 +135,10 @@ public interface VlogService {
|
||||
* @return 随机视频列表
|
||||
*/
|
||||
List<Map<String, Object>> getRandomVlogs(int limit);
|
||||
|
||||
IPage<String> getVlogForUser(Page<IndexVlogVO> page, Long memberId);
|
||||
int readVlog(Long memberId, String vlogId);
|
||||
|
||||
List<IndexVlogVO> getIndexVlogList(Map<String, Object> paramMap);
|
||||
|
||||
}
|
||||
|
@ -80,7 +80,7 @@ public class MsgServiceImpl extends BaseInfoProperties implements MsgService {
|
||||
map = new HashMap<>();
|
||||
}
|
||||
|
||||
String relationship = redis.get(REDIS_FANS_AND_VLOGGER_RELATIONSHIP + ":" + msg.getToUserId() + ":" + msg.getFromUserId());
|
||||
String relationship = redis.getCacheObject(REDIS_FANS_AND_VLOGGER_RELATIONSHIP + ":" + msg.getToUserId() + ":" + msg.getFromUserId());
|
||||
if (StringUtils.isNotBlank(relationship) && relationship.equalsIgnoreCase("1")) {
|
||||
map.put("isFriend", true);
|
||||
} else {
|
||||
|
@ -0,0 +1,67 @@
|
||||
package com.wzj.soopin.content.service.impl;
|
||||
|
||||
import com.google.gson.JsonObject;
|
||||
import com.wzj.soopin.content.service.IVlogPullService;
|
||||
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.wzj.soopin.content.domain.vo.IndexVlogVO;
|
||||
import com.wzj.soopin.content.service.VlogService;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.dromara.common.core.domain.model.LoginUser;
|
||||
import org.dromara.common.core.exception.ServiceException;
|
||||
import org.dromara.common.json.utils.JsonUtils;
|
||||
import org.dromara.common.mq.domain.MQMessage;
|
||||
import org.dromara.common.mq.utils.UserRocketMQConsumerManager;
|
||||
import org.dromara.common.satoken.utils.LoginHelper;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Service
|
||||
@AllArgsConstructor
|
||||
@Slf4j
|
||||
public class VlogPullServiceImpl implements IVlogPullService {
|
||||
|
||||
private final UserRocketMQConsumerManager mqConsumerManager;
|
||||
private final VlogService vlogService;
|
||||
|
||||
@Override
|
||||
public Page<IndexVlogVO> page(Page<IndexVlogVO> page) {
|
||||
|
||||
List<MessageExt> messageExts = new ArrayList<>();
|
||||
|
||||
try {
|
||||
LoginUser loginUser = LoginHelper.getLoginUser();
|
||||
if (loginUser == null) {
|
||||
messageExts = mqConsumerManager.pullUserMessages("hot", (int) page.getSize());
|
||||
} else {
|
||||
messageExts = mqConsumerManager.pullUserMessages(loginUser.getUserId() + "", (int) page.getSize());
|
||||
|
||||
}
|
||||
List<String> ids=messageExts.stream().map(messageExt -> {
|
||||
MQMessage mqMessage = JsonUtils.parseObject(messageExt.getBody(), MQMessage.class);
|
||||
return mqMessage.getData().toString();
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
if(ids.size()>0){
|
||||
Map<String, Object> paramMap = new HashMap<>();
|
||||
paramMap.put("ids", ids);
|
||||
List<IndexVlogVO> vlogVOList = vlogService.getIndexVlogList(paramMap);
|
||||
return page.setRecords(vlogVOList);
|
||||
}else{
|
||||
return page;
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("拉取视频失败", e);
|
||||
}
|
||||
return page;
|
||||
}
|
||||
}
|
@ -0,0 +1,112 @@
|
||||
package com.wzj.soopin.content.service.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.wzj.soopin.content.domain.po.Vlog;
|
||||
import com.wzj.soopin.content.domain.vo.IndexVlogVO;
|
||||
import com.wzj.soopin.content.domain.vo.VlogerVO;
|
||||
import com.wzj.soopin.content.service.IVlogPushService;
|
||||
import com.wzj.soopin.content.service.VlogService;
|
||||
import com.wzj.soopin.member.domain.po.Member;
|
||||
import com.wzj.soopin.member.domain.vo.MemberVO;
|
||||
import com.wzj.soopin.member.service.IMemberService;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.dromara.common.core.utils.MapstructUtils;
|
||||
import org.dromara.common.mq.domain.MQMessage;
|
||||
import org.dromara.common.mq.utils.MqUtil;
|
||||
import org.dromara.common.redis.redis.RedisCache;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.wzj.soopin.content.domain.base.BaseInfoProperties.*;
|
||||
|
||||
@Service
|
||||
@AllArgsConstructor
|
||||
@Slf4j
|
||||
public class VlogPushServiceImpl implements IVlogPushService {
|
||||
|
||||
private final RedisCache redisCache;
|
||||
private final VlogService vlogService;
|
||||
|
||||
private final IMemberService memberService;
|
||||
|
||||
@Override
|
||||
public boolean cacheTopLikedVlogs(int limit ) {
|
||||
try {
|
||||
// 1. 获取 ZSet 中所有成员(按分数升序排列)
|
||||
// 这里我们只关心成员(member),不关心分数(score),所以使用 range(key, start, end)
|
||||
log.info("开始查询点赞最多的{}条视频", limit);
|
||||
Set<String> rankSet = redisCache.zSetRange(REDIS_VLOG_BE_LIKED_COUNTS, 0, limit);
|
||||
if (rankSet == null ||rankSet.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
if(rankSet!=null&&rankSet.size()<limit){
|
||||
//从数据库中获取差量数据
|
||||
Page<Vlog> vlogPage = new Page<>(1, limit-rankSet.size());
|
||||
vlogPage = vlogService.page(vlogPage, new LambdaQueryWrapper<Vlog>()
|
||||
.orderByDesc(Vlog::getLikeCounts));
|
||||
Set<String> vlogIds = vlogPage.getRecords().stream().map(vlog -> vlog.getId()).collect(Collectors.toSet());
|
||||
rankSet.addAll(vlogIds);
|
||||
}
|
||||
// 3. 将 Set 转换为 List 以便打乱顺序
|
||||
List<String> vlogList = new ArrayList<>(rankSet);
|
||||
|
||||
// 4. 使用 Collections.shuffle 打乱 List 的顺序
|
||||
Collections.shuffle(vlogList);
|
||||
vlogList.stream().forEach(vlogId -> {
|
||||
|
||||
//将数据发送的mq的热点数据队列
|
||||
MQMessage message = MQMessage.builder()
|
||||
.topic("MEMBER_VLOG_MSG")
|
||||
.tag("hot")
|
||||
.messageType("json")
|
||||
.data(vlogId)
|
||||
.source("vlog_service")
|
||||
.sendTime(LocalDateTime.now())
|
||||
.build();
|
||||
MqUtil.sendMessage("MEMBER_VLOG_MSG", message);
|
||||
});
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("缓存点赞最多视频到Redis失败", e);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean pushVlogToMember(int count,Long memberId ) {
|
||||
|
||||
Member member = memberService.getById(memberId);
|
||||
String cityCode = member.getCity();
|
||||
IPage<String> vlogPage=vlogService.getVlogForUser(new Page<>(1, 100), memberId);
|
||||
if(CollectionUtils.isEmpty(vlogPage.getRecords())){
|
||||
return false;
|
||||
}
|
||||
vlogPage.getRecords().stream().forEach(vlogId -> {
|
||||
//将数据发送的mq的热点数据队列
|
||||
MQMessage message = MQMessage.builder()
|
||||
.topic("MEMBER_VLOG_MSG")
|
||||
.tag(memberId+"")
|
||||
.messageType("json")
|
||||
.data(vlogId)
|
||||
.source("vlog_service")
|
||||
.sendTime(LocalDateTime.now())
|
||||
.build();
|
||||
MqUtil.sendMessage("MEMBER_VLOG_MSG"+":"+memberId, message);
|
||||
});
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
@ -14,6 +14,7 @@ import com.wzj.soopin.content.utils.TencentCloudUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dromara.common.mybatis.core.page.PageQuery;
|
||||
import org.dromara.common.mybatis.core.page.TableDataInfo;
|
||||
import org.dromara.common.redis.redis.RedisCache;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@ -36,7 +37,7 @@ public class VlogUploadServiceImpl implements VlogUploadService {
|
||||
@Autowired
|
||||
private com.wzj.soopin.content.service.CommentService commentService;
|
||||
@Autowired
|
||||
private com.wzj.soopin.content.utils.RedisOperator redis;
|
||||
private RedisCache redisCache;
|
||||
@Autowired
|
||||
private com.wzj.soopin.content.mapper.VlogMapperCustom vlogMapperCustom;
|
||||
|
||||
@ -301,26 +302,11 @@ public class VlogUploadServiceImpl implements VlogUploadService {
|
||||
}
|
||||
|
||||
// 2. 点赞数量:优先 Redis,无则 MySQL
|
||||
String likeCountsStr = redis.get(com.wzj.soopin.content.domain.base.BaseInfoProperties.REDIS_VLOG_BE_LIKED_COUNTS + ":" + vlogId);
|
||||
int likeCounts;
|
||||
if (likeCountsStr != null) {
|
||||
likeCounts = Integer.parseInt(likeCountsStr);
|
||||
} else if (vlog != null && vlog.getLikeCounts() != null) {
|
||||
likeCounts = vlog.getLikeCounts();
|
||||
} else {
|
||||
likeCounts = 0;
|
||||
}
|
||||
int likeCounts=redisCache.zSetScore(com.wzj.soopin.content.domain.base.BaseInfoProperties.REDIS_VLOG_BE_LIKED_COUNTS , vlogId).intValue();
|
||||
|
||||
// 3. 评论数量:优先 Redis,无则 MySQL
|
||||
String commentCountsStr = redis.get(com.wzj.soopin.content.domain.base.BaseInfoProperties.REDIS_VLOG_COMMENT_COUNTS + ":" + vlogId);
|
||||
int commentCounts;
|
||||
if (commentCountsStr != null) {
|
||||
commentCounts = Integer.parseInt(commentCountsStr);
|
||||
} else if (vlog != null && vlog.getCommentsCounts() != null) {
|
||||
commentCounts = vlog.getCommentsCounts();
|
||||
} else {
|
||||
commentCounts = 0;
|
||||
}
|
||||
int commentCounts=redisCache.zSetScore(com.wzj.soopin.content.domain.base.BaseInfoProperties.REDIS_VLOG_COMMENT_COUNTS , vlogId).intValue();
|
||||
|
||||
|
||||
// 4. 评论内容:只查 MySQL
|
||||
List<com.wzj.soopin.content.domain.vo.CommentVO> commentList = new ArrayList<>();
|
||||
|
@ -27,7 +27,7 @@ public class VlogScheduledTask {
|
||||
public void jobExecute() {
|
||||
log.info("开始执行定时任务:查询点赞最多的100条视频并存储到Redis");
|
||||
try {
|
||||
vlogService.cacheTopLikedVlogs(100);
|
||||
// vlogService.cacheTopLikedVlogs(100);
|
||||
log.info("定时任务执行完成:成功缓存点赞最多的100条视频到Redis");
|
||||
} catch (Exception e) {
|
||||
log.error("定时任务执行失败:缓存点赞最多视频到Redis时发生异常", e);
|
||||
|
@ -24,286 +24,286 @@ public class RedisOperator {
|
||||
private StringRedisTemplate redisTemplate;
|
||||
|
||||
// Key(键),简单的key-value操作
|
||||
|
||||
/**
|
||||
* 判断key是否存在
|
||||
* @param key
|
||||
* @return
|
||||
*/
|
||||
public boolean keyIsExist(String key) {
|
||||
return redisTemplate.hasKey(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* 实现命令:TTL key,以秒为单位,返回给定 key的剩余生存时间(TTL, time to live)。
|
||||
*
|
||||
* @param key
|
||||
* @return
|
||||
*/
|
||||
public long ttl(String key) {
|
||||
return redisTemplate.getExpire(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* 实现命令:expire 设置过期时间,单位秒
|
||||
*
|
||||
* @param key
|
||||
* @return
|
||||
*/
|
||||
public void expire(String key, long timeout) {
|
||||
redisTemplate.expire(key, timeout, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* 实现命令:increment key,增加key一次
|
||||
*
|
||||
* @param key
|
||||
* @return
|
||||
*/
|
||||
public long increment(String key, long delta) {
|
||||
return redisTemplate.opsForValue().increment(key, delta);
|
||||
}
|
||||
|
||||
/**
|
||||
* 累加,使用hash
|
||||
*/
|
||||
public long incrementHash(String name, String key, long delta) {
|
||||
return redisTemplate.opsForHash().increment(name, key, delta);
|
||||
}
|
||||
|
||||
/**
|
||||
* 累减,使用hash
|
||||
*/
|
||||
public long decrementHash(String name, String key, long delta) {
|
||||
delta = delta * (-1);
|
||||
return redisTemplate.opsForHash().increment(name, key, delta);
|
||||
}
|
||||
|
||||
/**
|
||||
* hash 设置value
|
||||
*/
|
||||
public void setHashValue(String name, String key, String value) {
|
||||
redisTemplate.opsForHash().put(name, key, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* hash 获得value
|
||||
*/
|
||||
public String getHashValue(String name, String key) {
|
||||
return (String)redisTemplate.opsForHash().get(name, key);
|
||||
}
|
||||
|
||||
/**
|
||||
* 实现命令:decrement key,减少key一次
|
||||
*
|
||||
* @param key
|
||||
* @return
|
||||
*/
|
||||
public long decrement(String key, long delta) {
|
||||
return redisTemplate.opsForValue().decrement(key, delta);
|
||||
}
|
||||
|
||||
/**
|
||||
* 实现命令:KEYS pattern,查找所有符合给定模式 pattern的 key
|
||||
*/
|
||||
public Set<String> keys(String pattern) {
|
||||
return redisTemplate.keys(pattern);
|
||||
}
|
||||
|
||||
/**
|
||||
* 实现命令:DEL key,删除一个key
|
||||
*
|
||||
* @param key
|
||||
*/
|
||||
public void del(String key) {
|
||||
redisTemplate.delete(key);
|
||||
}
|
||||
|
||||
// String(字符串)
|
||||
|
||||
/**
|
||||
* 实现命令:SET key value,设置一个key-value(将字符串值 value关联到 key)
|
||||
*
|
||||
* @param key
|
||||
* @param value
|
||||
*/
|
||||
public void set(String key, String value) {
|
||||
redisTemplate.opsForValue().set(key, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* 实现命令:SET key value EX seconds,设置key-value和超时时间(秒)
|
||||
*
|
||||
* @param key
|
||||
* @param value
|
||||
* @param timeout
|
||||
* (以秒为单位)
|
||||
*/
|
||||
public void set(String key, String value, long timeout) {
|
||||
redisTemplate.opsForValue().set(key, value, timeout, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* 如果key不存在,则设置,如果存在,则报错
|
||||
* @param key
|
||||
* @param value
|
||||
*/
|
||||
public void setnx60s(String key, String value) {
|
||||
redisTemplate.opsForValue().setIfAbsent(key, value, 60, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* 如果key不存在,则设置,如果存在,则报错
|
||||
* @param key
|
||||
* @param value
|
||||
*/
|
||||
public void setnx(String key, String value) {
|
||||
redisTemplate.opsForValue().setIfAbsent(key, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* 实现命令:GET key,返回 key所关联的字符串值。
|
||||
*
|
||||
* @param key
|
||||
* @return value
|
||||
*/
|
||||
public String get(String key) {
|
||||
return (String)redisTemplate.opsForValue().get(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量查询,对应mget
|
||||
* @param keys
|
||||
* @return
|
||||
*/
|
||||
public List<String> mget(List<String> keys) {
|
||||
return redisTemplate.opsForValue().multiGet(keys);
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量查询,管道pipeline
|
||||
* @param keys
|
||||
* @return
|
||||
*/
|
||||
public List<Object> batchGet(List<String> keys) {
|
||||
|
||||
// nginx -> keepalive
|
||||
// redis -> pipeline
|
||||
|
||||
List<Object> result = redisTemplate.executePipelined(new RedisCallback<String>() {
|
||||
@Override
|
||||
public String doInRedis(RedisConnection connection) throws DataAccessException {
|
||||
StringRedisConnection src = (StringRedisConnection)connection;
|
||||
|
||||
for (String k : keys) {
|
||||
src.get(k);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
// Hash(哈希表)
|
||||
|
||||
/**
|
||||
* 实现命令:HSET key field value,将哈希表 key中的域 field的值设为 value
|
||||
*
|
||||
* @param key
|
||||
* @param field
|
||||
* @param value
|
||||
*/
|
||||
public void hset(String key, String field, Object value) {
|
||||
redisTemplate.opsForHash().put(key, field, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* 实现命令:HGET key field,返回哈希表 key中给定域 field的值
|
||||
*
|
||||
* @param key
|
||||
* @param field
|
||||
* @return
|
||||
*/
|
||||
public String hget(String key, String field) {
|
||||
return (String) redisTemplate.opsForHash().get(key, field);
|
||||
}
|
||||
|
||||
/**
|
||||
* 实现命令:HDEL key field [field ...],删除哈希表 key 中的一个或多个指定域,不存在的域将被忽略。
|
||||
*
|
||||
* @param key
|
||||
* @param fields
|
||||
*/
|
||||
public void hdel(String key, Object... fields) {
|
||||
redisTemplate.opsForHash().delete(key, fields);
|
||||
}
|
||||
|
||||
/**
|
||||
* 实现命令:HGETALL key,返回哈希表 key中,所有的域和值。
|
||||
*
|
||||
* @param key
|
||||
* @return
|
||||
*/
|
||||
public Map<Object, Object> hgetall(String key) {
|
||||
return redisTemplate.opsForHash().entries(key);
|
||||
}
|
||||
|
||||
// List(列表)
|
||||
|
||||
/**
|
||||
* 实现命令:LPUSH key value,将一个值 value插入到列表 key的表头
|
||||
*
|
||||
* @param key
|
||||
* @param value
|
||||
* @return 执行 LPUSH命令后,列表的长度。
|
||||
*/
|
||||
public long lpush(String key, String value) {
|
||||
return redisTemplate.opsForList().leftPush(key, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* 实现命令:LPOP key,移除并返回列表 key的头元素。
|
||||
*
|
||||
* @param key
|
||||
* @return 列表key的头元素。
|
||||
*/
|
||||
public String lpop(String key) {
|
||||
return (String)redisTemplate.opsForList().leftPop(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* 实现命令:RPUSH key value,将一个值 value插入到列表 key的表尾(最右边)。
|
||||
*
|
||||
* @param key
|
||||
* @param value
|
||||
* @return 执行 LPUSH命令后,列表的长度。
|
||||
*/
|
||||
public long rpush(String key, String value) {
|
||||
return redisTemplate.opsForList().rightPush(key, value);
|
||||
}
|
||||
// List(列表)
|
||||
/**
|
||||
* 实现命令:LRANGE key start stop,返回列表key中指定区间内的元素
|
||||
*
|
||||
* @param key Redis key
|
||||
* @param start 开始索引
|
||||
* @param stop 结束索引
|
||||
* @return 返回指定区间的元素
|
||||
*/
|
||||
public List<String> lrange(String key, long start, long stop) {
|
||||
return redisTemplate.opsForList().range(key, start, stop);
|
||||
}
|
||||
/**
|
||||
* 实现命令:LREM key count value,移除列表中与 value 相等的元素
|
||||
*
|
||||
* @param key Redis key
|
||||
* @param count 删除的数量(正数表示从头部开始删除,负数从尾部,0表示删除全部匹配项)
|
||||
* @param value 要删除的元素值
|
||||
* @return 被删除的元素个数
|
||||
*/
|
||||
public Long lrem(String key, long count, Object value) {
|
||||
return redisTemplate.opsForList().remove(key, count, value);
|
||||
}
|
||||
//
|
||||
// /**
|
||||
// * 判断key是否存在
|
||||
// * @param key
|
||||
// * @return
|
||||
// */
|
||||
// public boolean keyIsExist(String key) {
|
||||
// return redisTemplate.hasKey(key);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 实现命令:TTL key,以秒为单位,返回给定 key的剩余生存时间(TTL, time to live)。
|
||||
// *
|
||||
// * @param key
|
||||
// * @return
|
||||
// */
|
||||
// public long ttl(String key) {
|
||||
// return redisTemplate.getExpire(key);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 实现命令:expire 设置过期时间,单位秒
|
||||
// *
|
||||
// * @param key
|
||||
// * @return
|
||||
// */
|
||||
// public void expire(String key, long timeout) {
|
||||
// redisTemplate.expire(key, timeout, TimeUnit.SECONDS);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 实现命令:increment key,增加key一次
|
||||
// *
|
||||
// * @param key
|
||||
// * @return
|
||||
// */
|
||||
// public long increment(String key, long delta) {
|
||||
// return redisTemplate.opsForValue().increment(key, delta);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 累加,使用hash
|
||||
// */
|
||||
// public long incrementHash(String name, String key, long delta) {
|
||||
// return redisTemplate.opsForHash().increment(name, key, delta);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 累减,使用hash
|
||||
// */
|
||||
// public long decrementHash(String name, String key, long delta) {
|
||||
// delta = delta * (-1);
|
||||
// return redisTemplate.opsForHash().increment(name, key, delta);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * hash 设置value
|
||||
// */
|
||||
// public void setHashValue(String name, String key, String value) {
|
||||
// redisTemplate.opsForHash().put(name, key, value);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * hash 获得value
|
||||
// */
|
||||
// public String getHashValue(String name, String key) {
|
||||
// return (String)redisTemplate.opsForHash().get(name, key);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 实现命令:decrement key,减少key一次
|
||||
// *
|
||||
// * @param key
|
||||
// * @return
|
||||
// */
|
||||
// public long decrement(String key, long delta) {
|
||||
// return redisTemplate.opsForValue().decrement(key, delta);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 实现命令:KEYS pattern,查找所有符合给定模式 pattern的 key
|
||||
// */
|
||||
// public Set<String> keys(String pattern) {
|
||||
// return redisTemplate.keys(pattern);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 实现命令:DEL key,删除一个key
|
||||
// *
|
||||
// * @param key
|
||||
// */
|
||||
// public void del(String key) {
|
||||
// redisTemplate.delete(key);
|
||||
// }
|
||||
//
|
||||
// // String(字符串)
|
||||
//
|
||||
// /**
|
||||
// * 实现命令:SET key value,设置一个key-value(将字符串值 value关联到 key)
|
||||
// *
|
||||
// * @param key
|
||||
// * @param value
|
||||
// */
|
||||
// public void set(String key, String value) {
|
||||
// redisTemplate.opsForValue().set(key, value);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 实现命令:SET key value EX seconds,设置key-value和超时时间(秒)
|
||||
// *
|
||||
// * @param key
|
||||
// * @param value
|
||||
// * @param timeout
|
||||
// * (以秒为单位)
|
||||
// */
|
||||
// public void set(String key, String value, long timeout) {
|
||||
// redisTemplate.opsForValue().set(key, value, timeout, TimeUnit.SECONDS);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 如果key不存在,则设置,如果存在,则报错
|
||||
// * @param key
|
||||
// * @param value
|
||||
// */
|
||||
// public void setnx60s(String key, String value) {
|
||||
// redisTemplate.opsForValue().setIfAbsent(key, value, 60, TimeUnit.SECONDS);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 如果key不存在,则设置,如果存在,则报错
|
||||
// * @param key
|
||||
// * @param value
|
||||
// */
|
||||
// public void setnx(String key, String value) {
|
||||
// redisTemplate.opsForValue().setIfAbsent(key, value);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 实现命令:GET key,返回 key所关联的字符串值。
|
||||
// *
|
||||
// * @param key
|
||||
// * @return value
|
||||
// */
|
||||
// public String get(String key) {
|
||||
// return (String)redisTemplate.opsForValue().get(key);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 批量查询,对应mget
|
||||
// * @param keys
|
||||
// * @return
|
||||
// */
|
||||
// public List<String> mget(List<String> keys) {
|
||||
// return redisTemplate.opsForValue().multiGet(keys);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 批量查询,管道pipeline
|
||||
// * @param keys
|
||||
// * @return
|
||||
// */
|
||||
// public List<Object> batchGet(List<String> keys) {
|
||||
//
|
||||
//// nginx -> keepalive
|
||||
//// redis -> pipeline
|
||||
//
|
||||
// List<Object> result = redisTemplate.executePipelined(new RedisCallback<String>() {
|
||||
// @Override
|
||||
// public String doInRedis(RedisConnection connection) throws DataAccessException {
|
||||
// StringRedisConnection src = (StringRedisConnection)connection;
|
||||
//
|
||||
// for (String k : keys) {
|
||||
// src.get(k);
|
||||
// }
|
||||
// return null;
|
||||
// }
|
||||
// });
|
||||
//
|
||||
// return result;
|
||||
// }
|
||||
//
|
||||
//
|
||||
// // Hash(哈希表)
|
||||
//
|
||||
// /**
|
||||
// * 实现命令:HSET key field value,将哈希表 key中的域 field的值设为 value
|
||||
// *
|
||||
// * @param key
|
||||
// * @param field
|
||||
// * @param value
|
||||
// */
|
||||
// public void hset(String key, String field, Object value) {
|
||||
// redisTemplate.opsForHash().put(key, field, value);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 实现命令:HGET key field,返回哈希表 key中给定域 field的值
|
||||
// *
|
||||
// * @param key
|
||||
// * @param field
|
||||
// * @return
|
||||
// */
|
||||
// public String hget(String key, String field) {
|
||||
// return (String) redisTemplate.opsForHash().get(key, field);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 实现命令:HDEL key field [field ...],删除哈希表 key 中的一个或多个指定域,不存在的域将被忽略。
|
||||
// *
|
||||
// * @param key
|
||||
// * @param fields
|
||||
// */
|
||||
// public void hdel(String key, Object... fields) {
|
||||
// redisTemplate.opsForHash().delete(key, fields);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 实现命令:HGETALL key,返回哈希表 key中,所有的域和值。
|
||||
// *
|
||||
// * @param key
|
||||
// * @return
|
||||
// */
|
||||
// public Map<Object, Object> hgetall(String key) {
|
||||
// return redisTemplate.opsForHash().entries(key);
|
||||
// }
|
||||
//
|
||||
// // List(列表)
|
||||
//
|
||||
// /**
|
||||
// * 实现命令:LPUSH key value,将一个值 value插入到列表 key的表头
|
||||
// *
|
||||
// * @param key
|
||||
// * @param value
|
||||
// * @return 执行 LPUSH命令后,列表的长度。
|
||||
// */
|
||||
// public long lpush(String key, String value) {
|
||||
// return redisTemplate.opsForList().leftPush(key, value);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 实现命令:LPOP key,移除并返回列表 key的头元素。
|
||||
// *
|
||||
// * @param key
|
||||
// * @return 列表key的头元素。
|
||||
// */
|
||||
// public String lpop(String key) {
|
||||
// return (String)redisTemplate.opsForList().leftPop(key);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 实现命令:RPUSH key value,将一个值 value插入到列表 key的表尾(最右边)。
|
||||
// *
|
||||
// * @param key
|
||||
// * @param value
|
||||
// * @return 执行 LPUSH命令后,列表的长度。
|
||||
// */
|
||||
// public long rpush(String key, String value) {
|
||||
// return redisTemplate.opsForList().rightPush(key, value);
|
||||
// }
|
||||
// // List(列表)
|
||||
// /**
|
||||
// * 实现命令:LRANGE key start stop,返回列表key中指定区间内的元素
|
||||
// *
|
||||
// * @param key Redis key
|
||||
// * @param start 开始索引
|
||||
// * @param stop 结束索引
|
||||
// * @return 返回指定区间的元素
|
||||
// */
|
||||
// public List<String> lrange(String key, long start, long stop) {
|
||||
// return redisTemplate.opsForList().range(key, start, stop);
|
||||
// }
|
||||
// /**
|
||||
// * 实现命令:LREM key count value,移除列表中与 value 相等的元素
|
||||
// *
|
||||
// * @param key Redis key
|
||||
// * @param count 删除的数量(正数表示从头部开始删除,负数从尾部,0表示删除全部匹配项)
|
||||
// * @param value 要删除的元素值
|
||||
// * @return 被删除的元素个数
|
||||
// */
|
||||
// public Long lrem(String key, long count, Object value) {
|
||||
// return redisTemplate.opsForList().remove(key, count, value);
|
||||
// }
|
||||
}
|
||||
|
@ -282,5 +282,27 @@
|
||||
</choose>
|
||||
</select>
|
||||
|
||||
<select id="getVlogForUser" resultType="java.lang.String">
|
||||
SELECT
|
||||
v.id
|
||||
FROM
|
||||
cont_vlog v
|
||||
WHERE
|
||||
NOT EXISTS (
|
||||
SELECT 1 FROM cms_vlog_member m
|
||||
WHERE m.vlog_id = v.id AND m.member_id = ${memberId}
|
||||
)
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM ums_block b
|
||||
WHERE b.block_member_id = v.id AND b.member_id = ${memberId}
|
||||
)
|
||||
ORDER BY
|
||||
v.create_time DESC
|
||||
|
||||
</select>
|
||||
|
||||
<update id="readVlog">
|
||||
insert into cms_vlog_member (vlog_id, member_id,status) values (#{vlogId}, #{memberId},1)
|
||||
</update>
|
||||
</mapper>
|
||||
|
||||
|
@ -58,6 +58,12 @@
|
||||
#{memberId}
|
||||
</foreach>
|
||||
</if>
|
||||
<if test="paramMap.ids != null and paramMap.ids.size() > 0">
|
||||
AND v.id IN
|
||||
<foreach collection="paramMap.ids" item="vlogId" open="(" separator="," close=")">
|
||||
#{vlogId}
|
||||
</foreach>
|
||||
</if>
|
||||
ORDER BY
|
||||
v.create_time
|
||||
DESC
|
||||
|
@ -70,4 +70,7 @@ public interface IMemberService extends IService<Member> {
|
||||
|
||||
|
||||
String updateWechat(MemberBO bo);
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.wzj.soopin.order.domain.entity.OrderItem;
|
||||
import com.wzj.soopin.order.mapper.OrderItemMapper;
|
||||
import com.wzj.soopin.order.service.OrderItemService;
|
||||
import org.dromara.common.redis.redis.RedisCache;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
@ -15,6 +16,8 @@ import java.time.format.DateTimeFormatter;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
@ -28,7 +31,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
public class OrderItemServiceImpl extends ServiceImpl<OrderItemMapper, OrderItem> implements OrderItemService {
|
||||
|
||||
@Autowired
|
||||
private RedisOperator redis;
|
||||
private RedisCache redisCache;
|
||||
|
||||
@Override
|
||||
public List<OrderItem> findByOrderId(Long orderId) {
|
||||
@ -54,7 +57,7 @@ public class OrderItemServiceImpl extends ServiceImpl<OrderItemMapper, OrderItem
|
||||
|
||||
// 存储到Redis,设置24小时过期时间
|
||||
String redisKey = "top_trading_products:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
|
||||
redis.set(redisKey, jsonData, 24 * 60 * 60); // 24小时过期
|
||||
redisCache.setCacheObject(redisKey, jsonData, 24 * 60 * 60, TimeUnit.SECONDS); // 24小时过期
|
||||
|
||||
log.info("成功缓存{}个交易量最多的商品到Redis,key: {}", topTradingProducts.size(), redisKey);
|
||||
} else {
|
||||
|
Loading…
x
Reference in New Issue
Block a user