消息推送加自动触发
This commit is contained in:
parent
029c4c1f2a
commit
7e027b493d
6
pom.xml
6
pom.xml
@ -31,7 +31,7 @@
|
||||
<redisson.version>3.45.1</redisson.version>
|
||||
<lock4j.version>2.2.7</lock4j.version>
|
||||
<dynamic-ds.version>4.3.1</dynamic-ds.version>
|
||||
<snailjob.version>1.4.0</snailjob.version>
|
||||
<snailjob.version>1.5.0</snailjob.version>
|
||||
<mapstruct-plus.version>1.4.6</mapstruct-plus.version>
|
||||
<mapstruct-plus.lombok.version>0.2.0</mapstruct-plus.lombok.version>
|
||||
<lombok.version>1.18.36</lombok.version>
|
||||
@ -306,12 +306,12 @@
|
||||
<dependency>
|
||||
<groupId>com.aizuda</groupId>
|
||||
<artifactId>snail-job-client-starter</artifactId>
|
||||
<version>${snailjob.version}</version>
|
||||
<version>1.5.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.aizuda</groupId>
|
||||
<artifactId>snail-job-client-job-core</artifactId>
|
||||
<version>${snailjob.version}</version>
|
||||
<version>1.5.0</version>
|
||||
</dependency>
|
||||
|
||||
<!-- 加密包引入 -->
|
||||
|
@ -20,7 +20,7 @@ spring:
|
||||
|
||||
--- # snail-job 配置
|
||||
snail-job:
|
||||
enabled: false
|
||||
enabled: true
|
||||
# 需要在 SnailJob 后台组管理创建对应名称的组,然后创建任务的时候选择对应的组,才能正确分派任务
|
||||
group: "ruoyi_group"
|
||||
# SnailJob 接入验证令牌 详见 script/sql/ry_job.sql `sj_group_config` 表
|
||||
|
@ -26,10 +26,12 @@
|
||||
<dependency>
|
||||
<groupId>com.aizuda</groupId>
|
||||
<artifactId>snail-job-client-starter</artifactId>
|
||||
<version>1.5.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.aizuda</groupId>
|
||||
<artifactId>snail-job-client-job-core</artifactId>
|
||||
<version>1.5.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
|
@ -174,6 +174,10 @@
|
||||
<artifactId>tencentcloud-sdk-java-common</artifactId>
|
||||
<version>3.1.1030</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.dromara</groupId>
|
||||
<artifactId>ruoyi-system</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- 腾讯云SDK核心模块 -->
|
||||
<!-- <dependency>-->
|
||||
|
@ -49,21 +49,16 @@ public class CommentController extends BaseController {
|
||||
}
|
||||
|
||||
@ApiOperation("查询视频评论列表")
|
||||
@GetMapping("/vlogComments")
|
||||
public TableDataInfo<CommentVO> queryVlogComments(
|
||||
@ApiParam(value = "视频ID") @RequestParam String vlogId,
|
||||
@ApiParam(value = "用户ID") @RequestParam(required = false) String userId,
|
||||
PageQuery pageQuery) {
|
||||
@PostMapping("/vlogComments")
|
||||
public R<Page<CommentVO>> queryVlogComments(
|
||||
@RequestBody CommentBO bo,
|
||||
@RequestBody Page<Comment> page) {
|
||||
try {
|
||||
return commentService.queryVlogComments(vlogId, userId, pageQuery);
|
||||
Page<CommentVO> commentPage = commentService.pageComment(page, bo);
|
||||
return R.ok(commentPage);
|
||||
} catch (Exception e) {
|
||||
log.error("查询视频评论列表失败", e);
|
||||
TableDataInfo<CommentVO> errorResponse = new TableDataInfo<>();
|
||||
errorResponse.setCode(HttpStatus.ERROR);
|
||||
errorResponse.setMsg("查询视频评论列表失败: " + e.getMessage());
|
||||
errorResponse.setRows(Collections.emptyList());
|
||||
errorResponse.setTotal(0L);
|
||||
return errorResponse;
|
||||
return R.fail("查询视频评论列表失败: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,16 @@
|
||||
package com.wzj.soopin.content.convert;
|
||||
|
||||
import com.wzj.soopin.content.domain.po.Vlog;
|
||||
import com.wzj.soopin.content.domain.vo.IndexVlogVO;
|
||||
import com.wzj.soopin.content.domain.bo.VlogBO;
|
||||
import org.mapstruct.Mapper;
|
||||
import org.mapstruct.factory.Mappers;
|
||||
|
||||
@Mapper(componentModel = "spring")
|
||||
public interface VlogConvert {
|
||||
VlogConvert INSTANCE = Mappers.getMapper(VlogConvert.class);
|
||||
|
||||
IndexVlogVO toVO(Vlog vlog);
|
||||
|
||||
Vlog toEntity(VlogBO vlogBO);
|
||||
}
|
@ -27,11 +27,16 @@ import com.wzj.soopin.content.utils.RedisOperator;
|
||||
import com.wzj.soopin.content.utils.Sid;
|
||||
import com.wzj.soopin.content.utils.TencentCloudUtil;
|
||||
import com.wzj.soopin.member.service.IFansService;
|
||||
import com.wzj.soopin.content.convert.VlogConvert;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.dromara.system.service.ISysMessageService;
|
||||
import org.dromara.system.service.ISysMessageTemplateService;
|
||||
import org.dromara.system.domain.bo.SysMessageBo;
|
||||
import org.dromara.system.domain.vo.SysMessageTemplateVo;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
@ -63,6 +68,12 @@ public class VlogServiceImpl extends BaseInfoProperties implements VlogService {
|
||||
private TencentCloudUtil tencentCloudUtil;
|
||||
@Autowired
|
||||
private UsersMapper usersMapper;
|
||||
@Autowired
|
||||
private VlogConvert vlogConvert;
|
||||
@Autowired
|
||||
private ISysMessageService sysMessageService;
|
||||
@Autowired
|
||||
private ISysMessageTemplateService templateService;
|
||||
|
||||
@Transactional
|
||||
@Override
|
||||
@ -72,6 +83,31 @@ public class VlogServiceImpl extends BaseInfoProperties implements VlogService {
|
||||
.set(Vlog::getStatus, status)
|
||||
.set(Vlog::getReason, reason);
|
||||
vlogMapper.update(null, updateWrapper);
|
||||
|
||||
// 1. 获取视频信息,找到上传者
|
||||
Vlog vlog = vlogMapper.selectByFileId(fileId);
|
||||
if (vlog == null) return;
|
||||
String uploaderId = vlog.getVlogerId();
|
||||
|
||||
// 2. 选择模板(假设你有模板ID,实际可配置到常量或数据库)
|
||||
Long templateId = (status == 1) ? 1001L : 1002L; // 1001=审核通过模板,1002=驳回模板
|
||||
SysMessageTemplateVo template = templateService.selectTemplateById(templateId);
|
||||
if (template == null) return;
|
||||
|
||||
// 3. 参数替换
|
||||
String content = template.getTemplateContent()
|
||||
.replace("${videoTitle}", vlog.getTitle() == null ? "" : vlog.getTitle())
|
||||
.replace("${reason}", reason == null ? "" : reason);
|
||||
|
||||
// 4. 构造消息对象
|
||||
SysMessageBo messageBo = new SysMessageBo();
|
||||
messageBo.setTitle(template.getTitle());
|
||||
messageBo.setContent(content);
|
||||
messageBo.setSenderId(1L); // 系统管理员
|
||||
// 可根据需要设置更多字段
|
||||
|
||||
// 5. 发送消息
|
||||
sysMessageService.sendMessageToUser(messageBo, Long.valueOf(uploaderId));
|
||||
}
|
||||
|
||||
@Transactional
|
||||
@ -88,9 +124,7 @@ public class VlogServiceImpl extends BaseInfoProperties implements VlogService {
|
||||
@Override
|
||||
public void createVlog(VlogBO vlogBO) {
|
||||
String vid = sid.nextShort();
|
||||
Vlog vlog = new Vlog();
|
||||
BeanUtils.copyProperties(vlogBO, vlog);
|
||||
|
||||
Vlog vlog = vlogConvert.toEntity(vlogBO);
|
||||
vlog.setId(vid);
|
||||
vlog.setLikeCounts(0);
|
||||
vlog.setCommentsCounts(0);
|
||||
@ -165,8 +199,7 @@ public class VlogServiceImpl extends BaseInfoProperties implements VlogService {
|
||||
List<Vlog> vlogList = vlogPage.getRecords();
|
||||
|
||||
List<IndexVlogVO> voList = vlogList.stream().map(v -> {
|
||||
IndexVlogVO vo = new IndexVlogVO();
|
||||
BeanUtils.copyProperties(v, vo);
|
||||
IndexVlogVO vo = vlogConvert.toVO(v);
|
||||
|
||||
if (StringUtils.isNotBlank(userId)) {
|
||||
vo.setDoIFollowVloger(fansService.queryDoIFollowVloger(Long.valueOf(userId), Long.valueOf(v.getVlogerId())));
|
||||
@ -211,8 +244,7 @@ public class VlogServiceImpl extends BaseInfoProperties implements VlogService {
|
||||
return null;
|
||||
}
|
||||
|
||||
IndexVlogVO vo = new IndexVlogVO();
|
||||
BeanUtils.copyProperties(vlog, vo);
|
||||
IndexVlogVO vo = vlogConvert.toVO(vlog);
|
||||
|
||||
if (StringUtils.isNotBlank(userId)) {
|
||||
vo.setDoIFollowVloger(fansService.queryDoIFollowVloger(userId, vlog.getVlogerId()));
|
||||
@ -256,8 +288,7 @@ public class VlogServiceImpl extends BaseInfoProperties implements VlogService {
|
||||
List<Vlog> vlogList = vlogPage.getRecords();
|
||||
|
||||
List<IndexVlogVO> voList = vlogList.stream().map(v -> {
|
||||
IndexVlogVO vo = new IndexVlogVO();
|
||||
BeanUtils.copyProperties(v, vo);
|
||||
IndexVlogVO vo = vlogConvert.toVO(v);
|
||||
|
||||
if (StringUtils.isNotBlank(myId)) {
|
||||
vo.setDoIFollowVloger(fansService.queryDoIFollowVloger(myId, v.getVlogerId()));
|
||||
@ -289,6 +320,26 @@ public class VlogServiceImpl extends BaseInfoProperties implements VlogService {
|
||||
redis.increment(REDIS_VLOG_BE_LIKED_COUNTS + ":" + vlogId, 1);
|
||||
// 保存用户和视频的喜欢关系
|
||||
redis.set(REDIS_USER_LIKE_VLOG + ":" + userId + ":" + vlogId, "1");
|
||||
|
||||
// 发送点赞通知
|
||||
Vlog vlog = vlogMapper.selectById(vlogId);
|
||||
if (vlog != null) {
|
||||
String vlogerId = vlog.getVlogerId();
|
||||
if (!userId.equals(vlogerId)) { // 不给自己发
|
||||
Long templateId = 1003L; // 假设1003是点赞通知模板
|
||||
SysMessageTemplateVo template = templateService.selectTemplateById(templateId);
|
||||
if (template != null) {
|
||||
String content = template.getTemplateContent()
|
||||
.replace("${videoTitle}", vlog.getTitle() == null ? "" : vlog.getTitle())
|
||||
.replace("${liker}", userId); // 可查昵称
|
||||
SysMessageBo messageBo = new SysMessageBo();
|
||||
messageBo.setTitle(template.getTitle());
|
||||
messageBo.setContent(content);
|
||||
messageBo.setSenderId(Long.valueOf(userId));
|
||||
sysMessageService.sendMessageToUser(messageBo, Long.valueOf(vlogerId));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -336,8 +387,7 @@ public class VlogServiceImpl extends BaseInfoProperties implements VlogService {
|
||||
return null;
|
||||
}
|
||||
|
||||
IndexVlogVO vo = new IndexVlogVO();
|
||||
BeanUtils.copyProperties(vlog, vo);
|
||||
IndexVlogVO vo = vlogConvert.toVO(vlog);
|
||||
|
||||
if (StringUtils.isNotBlank(myId)) {
|
||||
vo.setDoIFollowVloger(fansService.queryDoIFollowVloger(myId, vlog.getVlogerId()));
|
||||
|
@ -100,7 +100,8 @@
|
||||
<artifactId>ruoyi-common-sse</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- <dependency>-->
|
||||
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>org.dromara</groupId>-->
|
||||
<!-- <artifactId>ruoyi-order</artifactId>-->
|
||||
<!-- <scope>provided</scope>-->
|
||||
|
@ -157,15 +157,11 @@
|
||||
<artifactId>hutool-all</artifactId>
|
||||
<version>5.8.22</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.dromara</groupId>
|
||||
<artifactId>ruoyi-content</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.dromara</groupId>
|
||||
<artifactId>ruoyi-content</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.dromara</groupId>
|
||||
<artifactId>ruoyi-content</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
</project>
|
||||
|
@ -27,7 +27,10 @@
|
||||
<groupId>org.dromara</groupId>
|
||||
<artifactId>ruoyi-common-doc</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.dromara</groupId>
|
||||
<artifactId>ruoyi-common-job</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.dromara</groupId>
|
||||
<artifactId>ruoyi-common-mybatis</artifactId>
|
||||
|
@ -279,6 +279,26 @@ public class SysMessageController extends BaseController {
|
||||
return R.fail("未找到消息接收者");
|
||||
}
|
||||
|
||||
// 判断是否为定时消息,若是则注册SnailJob单次任务
|
||||
// if (bo.getScheduledTime() != null && bo.getScheduledTime().after(new java.util.Date())) {
|
||||
// // 先保存消息,获取ID
|
||||
// SysMessage entity = bo.toEntity();
|
||||
// entity.setStatus("0"); // 未发送
|
||||
// messageService.getBaseMapper().insert(entity);
|
||||
// // 创建SnailJob单次任务
|
||||
// org.dromara.system.job.SnailJobApiUtil.createSingleJob(entity.getId(), entity.getScheduledTime());
|
||||
// // 关联用户
|
||||
// for (String userId : userIdStrings) {
|
||||
// org.dromara.system.domain.SysMessageUser messageUser = new org.dromara.system.domain.SysMessageUser();
|
||||
// messageUser.setMessageId(entity.getId());
|
||||
// messageUser.setUserId(Long.valueOf(userId));
|
||||
// messageUser.setIsRead(false);
|
||||
// // 这里假设有messageUserMapper可用,实际应通过service或mapper注入
|
||||
// // messageUserMapper.insert(messageUser);
|
||||
// }
|
||||
// return toAjax(1);
|
||||
// }
|
||||
|
||||
return toAjax(messageService.sendMessageToUsers(bo, userIdStrings));
|
||||
}
|
||||
|
||||
|
@ -48,8 +48,8 @@ public class SysMessage extends BaseAudit {
|
||||
/** 定时发送时间 */
|
||||
private Date scheduledTime;
|
||||
|
||||
// /** 状态(0正常 1停用) */
|
||||
// private String status;
|
||||
/** 状态(0未发送 1已发送) */
|
||||
private String status;
|
||||
|
||||
/** 扩展参数 */
|
||||
@TableField(exist = false)
|
||||
|
@ -60,6 +60,7 @@ public class SysMessageBo extends BaseAudit {
|
||||
|
||||
/** 定时发送时间 */
|
||||
@ExcelProperty(value = "定时发送时间")
|
||||
@JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSX", timezone = "Asia/Shanghai")
|
||||
private Date scheduledTime;
|
||||
|
||||
/** 发送范围(all:全部用户, userType:按用户类型, userIds:指定用户) */
|
||||
|
@ -51,15 +51,6 @@ public interface ISysMessageService extends IService<SysMessage> {
|
||||
*/
|
||||
int sendMessageToUserByStringId(SysMessageBo message, String userId);
|
||||
|
||||
/**
|
||||
* 向多个用户发送消息
|
||||
*
|
||||
* @param message 消息信息
|
||||
* @param userIds 用户ID列表 (Long类型)
|
||||
* @return 结果
|
||||
*/
|
||||
// int sendMessageToUsers(SysMessageBo message, List<Long> userIds);
|
||||
|
||||
/**
|
||||
* 向多个用户发送消息
|
||||
*
|
||||
@ -161,5 +152,25 @@ public interface ISysMessageService extends IService<SysMessage> {
|
||||
*/
|
||||
LambdaQueryWrapper<SysMessage> toWrapper(SysMessageBo bo);
|
||||
|
||||
/**
|
||||
* 查询所有未发送且到期的定时消息
|
||||
* @param now 当前时间
|
||||
* @return 待发送的定时消息列表
|
||||
*/
|
||||
List<SysMessage> getScheduledMessagesToSend(java.util.Date now);
|
||||
|
||||
/**
|
||||
* 更新消息状态
|
||||
* @param messageId 消息ID
|
||||
* @param status 状态(0未发送 1已发送)
|
||||
*/
|
||||
void updateMessageStatus(Long messageId, String status);
|
||||
|
||||
/**
|
||||
* 查询消息实体对象
|
||||
* @param id 消息ID
|
||||
* @return SysMessage 实体
|
||||
*/
|
||||
SysMessage selectEntityById(Long id);
|
||||
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ import org.dromara.system.domain.SysMessageUser;
|
||||
import org.dromara.system.domain.bo.SysMessageBo;
|
||||
import org.dromara.system.domain.event.MessageEvent;
|
||||
import org.dromara.system.domain.vo.SysMessageVo;
|
||||
//import org.dromara.system.job.SnailJobApiUtil;
|
||||
import org.dromara.system.mapper.SysMessageMapper;
|
||||
import org.dromara.system.mapper.SysMessageUserMapper;
|
||||
import org.dromara.system.service.ISysMessageService;
|
||||
@ -22,6 +23,7 @@ import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -87,18 +89,40 @@ public class SysMessageServiceImpl extends ServiceImpl<SysMessageMapper, SysMess
|
||||
public int sendMessageToUser(SysMessageBo message, Long userId) {
|
||||
// 保存消息
|
||||
SysMessage entity = message.toEntity();
|
||||
Date scheduledTime = entity.getScheduledTime();
|
||||
// if (scheduledTime != null && scheduledTime.after(new Date())) {
|
||||
// entity.setStatus("0"); // 未发送
|
||||
// messageMapper.insert(entity);
|
||||
// // 注册SnailJob单次任务
|
||||
// SnailJobApiUtil.createSingleJob(entity.getId(), scheduledTime);
|
||||
// // 检查消息与用户关联是否已存在
|
||||
// LambdaQueryWrapper<SysMessageUser> queryWrapper = new LambdaQueryWrapper<>();
|
||||
// queryWrapper.eq(SysMessageUser::getMessageId, entity.getId())
|
||||
// .eq(SysMessageUser::getUserId, userId);
|
||||
// int count = Math.toIntExact(messageUserMapper.selectCount(queryWrapper));
|
||||
// int rows = 0;
|
||||
// if (count == 0) {
|
||||
// SysMessageUser messageUser = new SysMessageUser();
|
||||
// messageUser.setMessageId(entity.getId());
|
||||
// messageUser.setUserId(userId);
|
||||
// messageUser.setIsRead(false);
|
||||
// rows = messageUserMapper.insert(messageUser);
|
||||
// } else {
|
||||
// log.info("消息与用户关联已存在,跳过创建: messageId={}, userId={}", entity.getId(), userId);
|
||||
// rows = 1;
|
||||
// }
|
||||
// // 定时消息不发布事件
|
||||
// return rows;
|
||||
// } else {
|
||||
entity.setStatus("1"); // 已发送
|
||||
messageMapper.insert(entity);
|
||||
|
||||
// 检查消息与用户关联是否已存在
|
||||
LambdaQueryWrapper<SysMessageUser> queryWrapper = new LambdaQueryWrapper<>();
|
||||
queryWrapper.eq(SysMessageUser::getMessageId, entity.getId())
|
||||
.eq(SysMessageUser::getUserId, userId);
|
||||
int count = Math.toIntExact(messageUserMapper.selectCount(queryWrapper));
|
||||
|
||||
// 只有当关联不存在时才创建
|
||||
int rows = 0;
|
||||
if (count == 0) {
|
||||
// 创建消息用户关联
|
||||
SysMessageUser messageUser = new SysMessageUser();
|
||||
messageUser.setMessageId(entity.getId());
|
||||
messageUser.setUserId(userId);
|
||||
@ -106,16 +130,15 @@ public class SysMessageServiceImpl extends ServiceImpl<SysMessageMapper, SysMess
|
||||
rows = messageUserMapper.insert(messageUser);
|
||||
} else {
|
||||
log.info("消息与用户关联已存在,跳过创建: messageId={}, userId={}", entity.getId(), userId);
|
||||
// 已存在也认为操作成功
|
||||
rows = 1;
|
||||
}
|
||||
|
||||
// 发送WebSocket消息
|
||||
if (rows > 0) {
|
||||
SysMessageVo messageVo = MapstructUtils.convert(entity, SysMessageVo.class);
|
||||
eventPublisher.publishEvent(new MessageEvent(this, messageVo, userId));
|
||||
}
|
||||
return rows;
|
||||
// }
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -124,23 +147,71 @@ public class SysMessageServiceImpl extends ServiceImpl<SysMessageMapper, SysMess
|
||||
if (userIds == null || userIds.isEmpty()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// 保存消息
|
||||
SysMessage entity = message.toEntity();
|
||||
Date scheduledTime = entity.getScheduledTime();
|
||||
// if (scheduledTime != null && scheduledTime.after(new Date())) {
|
||||
// entity.setStatus("0"); // 未发送
|
||||
// messageMapper.insert(entity);
|
||||
// // 批量创建消息用户关联
|
||||
// int count = 0;
|
||||
// List<Long> validUserIds = new ArrayList<>();
|
||||
// Map<Long, String> userIdStrMap = new HashMap<>();
|
||||
// for (String userIdStr : userIds) {
|
||||
// if (StringUtils.isBlank(userIdStr)) {
|
||||
// continue;
|
||||
// }
|
||||
// try {
|
||||
// Long userId = Long.parseLong(userIdStr);
|
||||
// validUserIds.add(userId);
|
||||
// userIdStrMap.put(userId, userIdStr);
|
||||
// } catch (NumberFormatException e) {
|
||||
// log.error("无法将String类型的用户ID转换为Long: {}", userIdStr);
|
||||
// }
|
||||
// }
|
||||
// if (!validUserIds.isEmpty()) {
|
||||
// LambdaQueryWrapper<SysMessageUser> queryWrapper = new LambdaQueryWrapper<>();
|
||||
// queryWrapper.eq(SysMessageUser::getMessageId, entity.getId())
|
||||
// .in(SysMessageUser::getUserId, validUserIds);
|
||||
// List<SysMessageUser> existingRecords = messageUserMapper.selectList(queryWrapper);
|
||||
// Set<Long> existingUserIds = existingRecords.stream()
|
||||
// .map(SysMessageUser::getUserId)
|
||||
// .collect(Collectors.toSet());
|
||||
// log.info("已存在的消息关联记录数: {}", existingUserIds.size());
|
||||
// for (Long userId : validUserIds) {
|
||||
// if (!existingUserIds.contains(userId)) {
|
||||
// try {
|
||||
// SysMessageUser messageUser = new SysMessageUser();
|
||||
// messageUser.setMessageId(entity.getId());
|
||||
// messageUser.setUserId(userId);
|
||||
// messageUser.setIsRead(false);
|
||||
// int rows = messageUserMapper.insert(messageUser);
|
||||
// if (rows > 0) {
|
||||
// count++;
|
||||
// }
|
||||
// } catch (Exception e) {
|
||||
// log.error("创建消息用户关联失败: messageId={}, userId={}, error={}",
|
||||
// entity.getId(), userId, e.getMessage());
|
||||
// }
|
||||
// } else {
|
||||
// log.info("消息与用户关联已存在,跳过创建: messageId={}, userId={}", entity.getId(), userId);
|
||||
// count++;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// // 定时消息不发布事件
|
||||
// return count;
|
||||
// } else {
|
||||
entity.setStatus("1"); // 已发送
|
||||
messageMapper.insert(entity);
|
||||
|
||||
// 批量创建消息用户关联
|
||||
int count = 0;
|
||||
|
||||
// 先过滤有效的用户ID
|
||||
List<Long> validUserIds = new ArrayList<>();
|
||||
Map<Long, String> userIdStrMap = new HashMap<>();
|
||||
|
||||
for (String userIdStr : userIds) {
|
||||
if (StringUtils.isBlank(userIdStr)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
Long userId = Long.parseLong(userIdStr);
|
||||
validUserIds.add(userId);
|
||||
@ -149,22 +220,15 @@ public class SysMessageServiceImpl extends ServiceImpl<SysMessageMapper, SysMess
|
||||
log.error("无法将String类型的用户ID转换为Long: {}", userIdStr);
|
||||
}
|
||||
}
|
||||
|
||||
// 一次性查询所有已存在的关联记录
|
||||
if (!validUserIds.isEmpty()) {
|
||||
LambdaQueryWrapper<SysMessageUser> queryWrapper = new LambdaQueryWrapper<>();
|
||||
queryWrapper.eq(SysMessageUser::getMessageId, entity.getId())
|
||||
.in(SysMessageUser::getUserId, validUserIds);
|
||||
List<SysMessageUser> existingRecords = messageUserMapper.selectList(queryWrapper);
|
||||
|
||||
// 创建已存在用户ID的集合
|
||||
Set<Long> existingUserIds = existingRecords.stream()
|
||||
.map(SysMessageUser::getUserId)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
log.info("已存在的消息关联记录数: {}", existingUserIds.size());
|
||||
|
||||
// 对不存在关联的用户批量插入
|
||||
for (Long userId : validUserIds) {
|
||||
if (!existingUserIds.contains(userId)) {
|
||||
try {
|
||||
@ -177,29 +241,24 @@ public class SysMessageServiceImpl extends ServiceImpl<SysMessageMapper, SysMess
|
||||
count++;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("创建消息用户关联失败: messageId={}, userId={}, error={}",
|
||||
log.error("创建消息用户关联失败: messageId={}, userId={}, error={}",
|
||||
entity.getId(), userId, e.getMessage());
|
||||
}
|
||||
} else {
|
||||
log.info("消息与用户关联已存在,跳过创建: messageId={}, userId={}", entity.getId(), userId);
|
||||
// 已存在也计入成功数量
|
||||
count++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 最后只发送一次批量事件通知
|
||||
if (count > 0) {
|
||||
SysMessageVo messageVo = MapstructUtils.convert(entity, SysMessageVo.class);
|
||||
// 为所有有效用户ID组装一个逗号分隔的字符串
|
||||
String batchUserIds = validUserIds.stream()
|
||||
.map(String::valueOf)
|
||||
.collect(Collectors.joining(","));
|
||||
// 发布一个批量事件,在MessageEvent和MessageEventListener中进行处理
|
||||
eventPublisher.publishEvent(MessageEvent.createWithStringUserId(this, messageVo, batchUserIds));
|
||||
for (Long userId : validUserIds) {
|
||||
eventPublisher.publishEvent(new MessageEvent(this, messageVo, userId));
|
||||
}
|
||||
}
|
||||
|
||||
return count;
|
||||
// }
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -313,4 +372,25 @@ public class SysMessageServiceImpl extends ServiceImpl<SysMessageMapper, SysMess
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<SysMessage> getScheduledMessagesToSend(java.util.Date now) {
|
||||
return this.lambdaQuery()
|
||||
.eq(SysMessage::getStatus, "0")
|
||||
.le(SysMessage::getScheduledTime, now)
|
||||
.list();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateMessageStatus(Long messageId, String status) {
|
||||
this.lambdaUpdate()
|
||||
.eq(SysMessage::getId, messageId)
|
||||
.set(SysMessage::getStatus, status)
|
||||
.update();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SysMessage selectEntityById(Long id) {
|
||||
return messageMapper.selectById(id);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -36,43 +36,34 @@ public class MessageScheduledTask {
|
||||
/**
|
||||
* 每分钟执行一次,处理待发送的消息
|
||||
*/
|
||||
@Scheduled(cron = "0 */1 * * * ?")
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void processScheduledMessages() {
|
||||
log.info("开始处理定时消息...");
|
||||
try {
|
||||
// 查询待发送的消息
|
||||
LambdaQueryWrapper<SysMessage> lqw = new LambdaQueryWrapper<>();
|
||||
// lqw.eq(SysMessage::getStatus, "0")
|
||||
// .le(SysMessage::getScheduledTime, LocalDateTime.now())
|
||||
// .isNotNull(SysMessage::getScheduledTime);
|
||||
|
||||
List<SysMessage> messages = messageMapper.selectList(lqw);
|
||||
|
||||
for (SysMessage message : messages) {
|
||||
// 查询消息接收者
|
||||
LambdaQueryWrapper<SysMessageUser> userLqw = new LambdaQueryWrapper<>();
|
||||
userLqw.eq(SysMessageUser::getMessageId, message.getId());
|
||||
List<SysMessageUser> messageUsers = messageUserMapper.selectList(userLqw);
|
||||
|
||||
// 发送消息
|
||||
SysMessageVo messageVo = MapstructUtils.convert(message, SysMessageVo.class);
|
||||
for (SysMessageUser messageUser : messageUsers) {
|
||||
try {
|
||||
messageWebSocketServer.sendMessage(messageUser.getUserId(), String.valueOf(messageVo));
|
||||
} catch (Exception e) {
|
||||
log.error("发送消息失败,消息ID:{},用户ID:{}", message.getId(), messageUser.getUserId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
// 更新消息状态为已发送
|
||||
// message.setStatus("1");
|
||||
messageMapper.updateById(message);
|
||||
}
|
||||
|
||||
log.info("定时消息处理完成,共处理{}条消息", messages.size());
|
||||
} catch (Exception e) {
|
||||
log.error("处理定时消息时发生错误", e);
|
||||
}
|
||||
}
|
||||
// @Scheduled(cron = "0 */1 * * * ?")
|
||||
// @Transactional(rollbackFor = Exception.class)
|
||||
// public void processScheduledMessages() {
|
||||
// log.info("开始处理定时消息...");
|
||||
// try {
|
||||
// // 查询待发送的消息
|
||||
// LambdaQueryWrapper<SysMessage> lqw = new LambdaQueryWrapper<>();
|
||||
// List<SysMessage> messages = messageMapper.selectList(lqw);
|
||||
// for (SysMessage message : messages) {
|
||||
// // 查询消息接收者
|
||||
// LambdaQueryWrapper<SysMessageUser> userLqw = new LambdaQueryWrapper<>();
|
||||
// userLqw.eq(SysMessageUser::getMessageId, message.getId());
|
||||
// List<SysMessageUser> messageUsers = messageUserMapper.selectList(userLqw);
|
||||
// // 发送消息
|
||||
// SysMessageVo messageVo = MapstructUtils.convert(message, SysMessageVo.class);
|
||||
// for (SysMessageUser messageUser : messageUsers) {
|
||||
// try {
|
||||
// messageWebSocketServer.sendMessage(messageUser.getUserId(), String.valueOf(messageVo));
|
||||
// } catch (Exception e) {
|
||||
// log.error("发送消息失败,消息ID:{},用户ID:{}", message.getId(), messageUser.getUserId(), e);
|
||||
// }
|
||||
// }
|
||||
// // 更新消息状态为已发送
|
||||
// messageMapper.updateById(message);
|
||||
// }
|
||||
// log.info("定时消息处理完成,共处理{}条消息", messages.size());
|
||||
// } catch (Exception e) {
|
||||
// log.error("处理定时消息时发生错误", e);
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user