[fix]修改视频上传
This commit is contained in:
parent
2616b5fff8
commit
bf728cee9b
@ -34,4 +34,6 @@ public class RocketMQConfig {
|
||||
* 视频上传主题
|
||||
*/
|
||||
public static final String VLOG_UPLOAD_TOPIC = "VLOG_UPLOAD_TOPIC";
|
||||
|
||||
public static final String VLOG_UPLOAD_GROUP = "VLOG_UPLOAD_GROUP";
|
||||
}
|
||||
|
@ -230,6 +230,29 @@ public class OssClient {
|
||||
downloadFile.completionFuture().join();
|
||||
return tempFilePath;
|
||||
}
|
||||
/**
|
||||
* 下载文件从 Amazon S3 到临时目录
|
||||
*
|
||||
* @param path 文件在 Amazon S3 中的对象键
|
||||
* @return 下载后的文件在本地的临时路径
|
||||
* @throws OssException 如果下载失败,抛出自定义异常
|
||||
*/
|
||||
public Path fileDownload(String path,String tempFile) {
|
||||
// 构建临时文件
|
||||
Path tempFilePath = FileUtils.createTempFile(tempFile,true).toPath();
|
||||
// 使用 S3TransferManager 下载文件
|
||||
FileDownload downloadFile = transferManager.downloadFile(
|
||||
x -> x.getObjectRequest(
|
||||
y -> y.bucket(properties.getBucketName())
|
||||
.key(removeBaseUrl(path))
|
||||
.build())
|
||||
.addTransferListener(LoggingTransferListener.create())
|
||||
.destination(tempFilePath)
|
||||
.build());
|
||||
// 等待文件下载操作完成
|
||||
downloadFile.completionFuture().join();
|
||||
return tempFilePath;
|
||||
}
|
||||
|
||||
/**
|
||||
* 下载文件从 Amazon S3 到 输出流
|
||||
|
@ -34,7 +34,7 @@ import java.nio.file.Path;
|
||||
@RequiredArgsConstructor
|
||||
@RocketMQMessageListener(
|
||||
topic = RocketMQConfig.VLOG_UPLOAD_TOPIC,
|
||||
consumerGroup = RocketMQConfig.CONSUMER_GROUP_SYS_MSG,
|
||||
consumerGroup = RocketMQConfig.VLOG_UPLOAD_GROUP,
|
||||
selectorExpression = "upload"
|
||||
)
|
||||
public class VlogUploadMessageConsumer implements RocketMQListener<MessageExt> {
|
||||
@ -44,12 +44,14 @@ public class VlogUploadMessageConsumer implements RocketMQListener<MessageExt> {
|
||||
|
||||
@Override
|
||||
public void onMessage(MessageExt messageExt) { // 参数为MessageExt
|
||||
|
||||
String message = new String(messageExt.getBody());
|
||||
log.info("接收到RocketMQ消息: {}, msgId: {}", message, messageExt.getMsgId());
|
||||
|
||||
try {
|
||||
MQMessage mqMessage = JsonUtils.parseObject(message, MQMessage.class);
|
||||
if (mqMessage != null) {
|
||||
|
||||
if (mqMessage != null&&messageExt.getBody()!=null) {
|
||||
boolean result = this.handleMessage(mqMessage);
|
||||
if (result) {
|
||||
// 消息处理成功,手动确认
|
||||
@ -80,6 +82,9 @@ public class VlogUploadMessageConsumer implements RocketMQListener<MessageExt> {
|
||||
//视频上传消息会包含视频的内容
|
||||
Vlog vlog = vlogService.getById((String)mqMessage.getData());
|
||||
//获取文件内容
|
||||
if(ObjectUtil.isNull(vlog)){
|
||||
return false;
|
||||
}
|
||||
String fileId=vlog.getFileId();
|
||||
//检查该文件是否为oss文件,如果不是说明已经上传过了
|
||||
if(!vlog.getUrl().contains("#")){
|
||||
@ -89,12 +94,14 @@ public class VlogUploadMessageConsumer implements RocketMQListener<MessageExt> {
|
||||
String storagePath = "/data/vlogdata";
|
||||
//从oss下载文件
|
||||
|
||||
SysOssVo sysOss =ossService.getById(Long.getLong(fileId)) ;
|
||||
SysOssVo sysOss =ossService.getById(Long.parseLong(fileId)) ;
|
||||
if (ObjectUtil.isNull(sysOss)) {
|
||||
throw new ServiceException("文件数据不存在!");
|
||||
}
|
||||
OssClient ossClient = OssFactory.instance(sysOss.getService());
|
||||
Path filePath=ossClient.fileDownload(sysOss.getUrl());
|
||||
Path filePath=ossClient.fileDownload(sysOss.getUrl(),sysOss.getOriginalName());
|
||||
//对文件进行重命名
|
||||
|
||||
|
||||
fileId = qcCloud.uploadViaTempFile(filePath);
|
||||
log.info("视频发布ID:" + fileId);
|
||||
|
@ -161,24 +161,15 @@ public class VlogServiceImpl extends ServiceImpl<VlogMapper, Vlog> implements Vl
|
||||
|
||||
|
||||
//发出mq消息,异步处理上传
|
||||
// MQMessage message = MQMessage.builder()
|
||||
// .messageType("json")
|
||||
// .data(vlog.getId())
|
||||
// .source("app")
|
||||
// .topic("VLOG_UPLOAD_TOPIC")
|
||||
// .tag("upload")
|
||||
// .sendTime(LocalDateTime.now())
|
||||
// .build();
|
||||
// MQMessage message = MQMessage.builder()
|
||||
// .topic("VLOG_UPLOAD_TOPIC")
|
||||
// .tag("upload")
|
||||
// .messageType("json")
|
||||
// .data("d")
|
||||
// .source("vlog_service")
|
||||
// .sendTime(LocalDateTime.now())
|
||||
// .build();
|
||||
MQMessage mqMessage = new MQMessage();
|
||||
MqUtil.sendMessage(RocketMQConfig.VLOG_UPLOAD_TOPIC+":"+"upload", mqMessage);
|
||||
MQMessage message = MQMessage.builder()
|
||||
.messageType("json")
|
||||
.data(vlog.getId())
|
||||
.source("app")
|
||||
.topic("VLOG_UPLOAD_TOPIC")
|
||||
.tag("upload")
|
||||
.sendTime(LocalDateTime.now())
|
||||
.build();
|
||||
MqUtil.sendMessage(RocketMQConfig.VLOG_UPLOAD_TOPIC+":"+"upload", message);
|
||||
}
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user