From bf728cee9bbfc98be9e6ff16bbed0fbd027c8312 Mon Sep 17 00:00:00 2001 From: wangqx Date: Fri, 5 Sep 2025 17:58:30 +0800 Subject: [PATCH] =?UTF-8?q?[fix]=E4=BF=AE=E6=94=B9=E8=A7=86=E9=A2=91?= =?UTF-8?q?=E4=B8=8A=E4=BC=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/mq/config/RocketMQConfig.java | 2 ++ .../dromara/common/oss/core/OssClient.java | 23 ++++++++++++++++ .../consumer/VlogUploadMessageConsumer.java | 15 ++++++++--- .../content/service/impl/VlogServiceImpl.java | 27 +++++++------------ 4 files changed, 45 insertions(+), 22 deletions(-) diff --git a/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/config/RocketMQConfig.java b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/config/RocketMQConfig.java index 73bb30fdf..91fa60a4e 100644 --- a/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/config/RocketMQConfig.java +++ b/ruoyi-common/ruoyi-common-mq/src/main/java/org/dromara/common/mq/config/RocketMQConfig.java @@ -34,4 +34,6 @@ public class RocketMQConfig { * 视频上传主题 */ public static final String VLOG_UPLOAD_TOPIC = "VLOG_UPLOAD_TOPIC"; + + public static final String VLOG_UPLOAD_GROUP = "VLOG_UPLOAD_GROUP"; } diff --git a/ruoyi-common/ruoyi-common-oss/src/main/java/org/dromara/common/oss/core/OssClient.java b/ruoyi-common/ruoyi-common-oss/src/main/java/org/dromara/common/oss/core/OssClient.java index de5119e91..f03c0d427 100644 --- a/ruoyi-common/ruoyi-common-oss/src/main/java/org/dromara/common/oss/core/OssClient.java +++ b/ruoyi-common/ruoyi-common-oss/src/main/java/org/dromara/common/oss/core/OssClient.java @@ -230,6 +230,29 @@ public class OssClient { downloadFile.completionFuture().join(); return tempFilePath; } + /** + * 下载文件从 Amazon S3 到临时目录 + * + * @param path 文件在 Amazon S3 中的对象键 + * @return 下载后的文件在本地的临时路径 + * @throws OssException 如果下载失败,抛出自定义异常 + */ + public Path fileDownload(String path,String tempFile) { + // 构建临时文件 + Path tempFilePath = FileUtils.createTempFile(tempFile,true).toPath(); + // 使用 S3TransferManager 下载文件 + FileDownload downloadFile = transferManager.downloadFile( + x -> x.getObjectRequest( + y -> y.bucket(properties.getBucketName()) + .key(removeBaseUrl(path)) + .build()) + .addTransferListener(LoggingTransferListener.create()) + .destination(tempFilePath) + .build()); + // 等待文件下载操作完成 + downloadFile.completionFuture().join(); + return tempFilePath; + } /** * 下载文件从 Amazon S3 到 输出流 diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/consumer/VlogUploadMessageConsumer.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/consumer/VlogUploadMessageConsumer.java index 60af0b08d..649778392 100644 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/consumer/VlogUploadMessageConsumer.java +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/consumer/VlogUploadMessageConsumer.java @@ -34,7 +34,7 @@ import java.nio.file.Path; @RequiredArgsConstructor @RocketMQMessageListener( topic = RocketMQConfig.VLOG_UPLOAD_TOPIC, - consumerGroup = RocketMQConfig.CONSUMER_GROUP_SYS_MSG, + consumerGroup = RocketMQConfig.VLOG_UPLOAD_GROUP, selectorExpression = "upload" ) public class VlogUploadMessageConsumer implements RocketMQListener { @@ -44,12 +44,14 @@ public class VlogUploadMessageConsumer implements RocketMQListener { @Override public void onMessage(MessageExt messageExt) { // 参数为MessageExt + String message = new String(messageExt.getBody()); log.info("接收到RocketMQ消息: {}, msgId: {}", message, messageExt.getMsgId()); try { MQMessage mqMessage = JsonUtils.parseObject(message, MQMessage.class); - if (mqMessage != null) { + + if (mqMessage != null&&messageExt.getBody()!=null) { boolean result = this.handleMessage(mqMessage); if (result) { // 消息处理成功,手动确认 @@ -80,6 +82,9 @@ public class VlogUploadMessageConsumer implements RocketMQListener { //视频上传消息会包含视频的内容 Vlog vlog = vlogService.getById((String)mqMessage.getData()); //获取文件内容 + if(ObjectUtil.isNull(vlog)){ + return false; + } String fileId=vlog.getFileId(); //检查该文件是否为oss文件,如果不是说明已经上传过了 if(!vlog.getUrl().contains("#")){ @@ -89,12 +94,14 @@ public class VlogUploadMessageConsumer implements RocketMQListener { String storagePath = "/data/vlogdata"; //从oss下载文件 - SysOssVo sysOss =ossService.getById(Long.getLong(fileId)) ; + SysOssVo sysOss =ossService.getById(Long.parseLong(fileId)) ; if (ObjectUtil.isNull(sysOss)) { throw new ServiceException("文件数据不存在!"); } OssClient ossClient = OssFactory.instance(sysOss.getService()); - Path filePath=ossClient.fileDownload(sysOss.getUrl()); + Path filePath=ossClient.fileDownload(sysOss.getUrl(),sysOss.getOriginalName()); + //对文件进行重命名 + fileId = qcCloud.uploadViaTempFile(filePath); log.info("视频发布ID:" + fileId); diff --git a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogServiceImpl.java b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogServiceImpl.java index d2b8023a6..7023cf546 100644 --- a/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogServiceImpl.java +++ b/ruoyi-modules/ruoyi-content/src/main/java/com/wzj/soopin/content/service/impl/VlogServiceImpl.java @@ -161,24 +161,15 @@ public class VlogServiceImpl extends ServiceImpl implements Vl //发出mq消息,异步处理上传 -// MQMessage message = MQMessage.builder() -// .messageType("json") -// .data(vlog.getId()) -// .source("app") -// .topic("VLOG_UPLOAD_TOPIC") -// .tag("upload") -// .sendTime(LocalDateTime.now()) -// .build(); -// MQMessage message = MQMessage.builder() -// .topic("VLOG_UPLOAD_TOPIC") -// .tag("upload") -// .messageType("json") -// .data("d") -// .source("vlog_service") -// .sendTime(LocalDateTime.now()) -// .build(); - MQMessage mqMessage = new MQMessage(); - MqUtil.sendMessage(RocketMQConfig.VLOG_UPLOAD_TOPIC+":"+"upload", mqMessage); + MQMessage message = MQMessage.builder() + .messageType("json") + .data(vlog.getId()) + .source("app") + .topic("VLOG_UPLOAD_TOPIC") + .tag("upload") + .sendTime(LocalDateTime.now()) + .build(); + MqUtil.sendMessage(RocketMQConfig.VLOG_UPLOAD_TOPIC+":"+"upload", message); }