添加直播间状态变化延时任务

This commit is contained in:
lifenlong 2021-06-01 17:23:32 +08:00
parent 6fb7322e3e
commit f802810330
10 changed files with 171 additions and 5 deletions

View File

@ -280,6 +280,8 @@ lili:
notice-group: lili_notice_group
notice-send-topic: lili_send_notice_topic
notice-send-group: lili_send_notice_group
broadcast-topic: lili_broadcast_topic
broadcast-group: lili_broadcast_group
rocketmq:
name-server: 192.168.0.116:9876
producer:

View File

@ -0,0 +1,36 @@
package cn.lili.trigger.executor;
import cn.hutool.json.JSONUtil;
import cn.lili.common.delayqueue.BroadcastMessage;
import cn.lili.common.trigger.interfaces.TimeTriggerExecutor;
import cn.lili.common.trigger.model.TimeExecuteConstant;
import cn.lili.modules.broadcast.service.StudioService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 直播间事件触发
*
* @author Bulbasaur
* @date: 2021/6/1 5:02 下午
*/
@Slf4j
@Component(TimeExecuteConstant.BROADCAST_EXECUTOR)
public class BroadcastTimeTriggerExecutor implements TimeTriggerExecutor {
@Autowired
private StudioService studioService;
@Override
public void execute(Object object) {
//直播间订单消息
BroadcastMessage broadcastMessage = JSONUtil.toBean(JSONUtil.parseObj(object), BroadcastMessage.class);
if (broadcastMessage != null && broadcastMessage.getStudioId() != null) {
log.info("直播间消费:{}", broadcastMessage);
// 修改直播间状态
studioService.updateStudioStatus(broadcastMessage);
}
}
}

View File

@ -279,6 +279,8 @@ lili:
notice-send-group: lili_send_notice_group
after-sale-topic: lili_after_sale_topic
after-sale-group: lili_after_sale_group
broadcast-topic: lili_broadcast_topic
broadcast-group: lili_broadcast_group
rocketmq:
name-server: 127.0.0.1:9876
producer:

View File

@ -0,0 +1,34 @@
package cn.lili.common.delayqueue;
import cn.hutool.core.date.DateTime;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
/**
* 直播消息实体
*
* @author Bulbasaur
* @date: 2021/6/1 4:48 下午
*/
@Data
@NoArgsConstructor
public class BroadcastMessage {
/**
* 直播间ID
*/
private String studioId;
/**
* 状态
*/
private String status;
public BroadcastMessage(String studioId, String status) {
this.studioId = studioId;
this.status = status;
}
}

View File

@ -15,7 +15,12 @@ public enum DelayQueueType {
/**
* 拼团订单
*/
PINTUAN_ORDER("拼团订单");
PINTUAN_ORDER("拼团订单"),
/**
* 直播
*/
BROADCAST("直播");
private final String description;

View File

@ -11,6 +11,11 @@ public abstract class TimeExecuteConstant {
*/
public static final String PROMOTION_EXECUTOR = "promotionTimeTriggerExecutor";
/**
* 直播间延迟加载执行器
*/
public static final String BROADCAST_EXECUTOR = "broadcastTimeTriggerExecutor";
/**
* 拼团延迟加载执行器
*/

View File

@ -61,4 +61,8 @@ public class RocketmqCustomProperties {
private String afterSaleGroup;
private String broadcastTopic;
private String broadcastGroup;
}

View File

@ -1,5 +1,6 @@
package cn.lili.modules.broadcast.service;
import cn.lili.common.delayqueue.BroadcastMessage;
import cn.lili.common.vo.PageVO;
import cn.lili.modules.broadcast.entity.dos.Studio;
import cn.lili.modules.broadcast.entity.vos.StudioVO;
@ -69,4 +70,10 @@ public interface StudioService extends IService<Studio> {
* @return 直播间分页
*/
IPage<Studio> studioList(PageVO pageVO, Integer recommend, String status);
/**
* 修改直播间状态
* @param broadcastMessage 直播间消息
*/
void updateStudioStatus(BroadcastMessage broadcastMessage);
}

View File

@ -1,15 +1,23 @@
package cn.lili.modules.broadcast.serviceimpl;
import cn.hutool.json.JSONUtil;
import cn.lili.common.delayqueue.BroadcastMessage;
import cn.lili.common.delayqueue.DelayQueueTools;
import cn.lili.common.delayqueue.DelayQueueType;
import cn.lili.common.enums.ResultCode;
import cn.lili.common.exception.ServiceException;
import cn.lili.common.security.context.UserContext;
import cn.lili.common.trigger.interfaces.TimeTrigger;
import cn.lili.common.trigger.model.TimeExecuteConstant;
import cn.lili.common.trigger.model.TimeTriggerMsg;
import cn.lili.common.utils.BeanUtil;
import cn.lili.common.utils.DateUtil;
import cn.lili.common.utils.PageUtil;
import cn.lili.common.vo.PageVO;
import cn.lili.modules.broadcast.entity.StudioStatusEnum;
import cn.lili.config.rocketmq.RocketmqCustomProperties;
import cn.lili.modules.broadcast.entity.dos.Studio;
import cn.lili.modules.broadcast.entity.dos.StudioCommodity;
import cn.lili.modules.broadcast.entity.enums.StudioStatusEnum;
import cn.lili.modules.broadcast.entity.vos.StudioVO;
import cn.lili.modules.broadcast.mapper.CommodityMapper;
import cn.lili.modules.broadcast.mapper.StudioMapper;
@ -18,6 +26,7 @@ import cn.lili.modules.broadcast.service.StudioService;
import cn.lili.modules.broadcast.util.WechatLivePlayerUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
@ -41,6 +50,12 @@ public class StudioServiceImpl extends ServiceImpl<StudioMapper, Studio> impleme
private StudioCommodityService studioCommodityService;
@Resource
private CommodityMapper commodityMapper;
//延时任务
@Autowired
private TimeTrigger timeTrigger;
//Rocketmq
@Autowired
private RocketmqCustomProperties rocketmqCustomProperties;
@Override
public Boolean create(Studio studio) {
@ -51,7 +66,28 @@ public class StudioServiceImpl extends ServiceImpl<StudioMapper, Studio> impleme
studio.setQrCodeUrl(roomMap.get("qrcodeUrl"));
studio.setStoreId(UserContext.getCurrentUser().getStoreId());
studio.setStatus(StudioStatusEnum.NEW.name());
return this.save(studio);
//直播间添加成功发送直播间开启关闭延时任务
if(this.save(studio)){
//直播开启延时任务
BroadcastMessage broadcastMessage = new BroadcastMessage(studio.getId(),StudioStatusEnum.START.name());
TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(TimeExecuteConstant.BROADCAST_EXECUTOR,
Long.parseLong(studio.getStartTime()), broadcastMessage,
DelayQueueTools.wrapperUniqueKey(DelayQueueType.BROADCAST, studio.getId()),
rocketmqCustomProperties.getPromotionTopic());
// 发送促销活动开始的延时任务
this.timeTrigger.addDelay(timeTriggerMsg, DateUtil.getDelayTime(Long.parseLong(studio.getStartTime())));
//直播结束延时任务
broadcastMessage = new BroadcastMessage(studio.getId(),StudioStatusEnum.END.name());
timeTriggerMsg = new TimeTriggerMsg(TimeExecuteConstant.BROADCAST_EXECUTOR,
Long.parseLong(studio.getEndTime()), broadcastMessage,
DelayQueueTools.wrapperUniqueKey(DelayQueueType.BROADCAST, studio.getId()),
rocketmqCustomProperties.getPromotionTopic());
// 发送促销活动开始的延时任务
this.timeTrigger.addDelay(timeTriggerMsg, DateUtil.getDelayTime(Long.parseLong(studio.getEndTime())));
}
return true;
} catch (Exception e) {
e.printStackTrace();
throw new ServiceException(ResultCode.ERROR);
@ -61,8 +97,33 @@ public class StudioServiceImpl extends ServiceImpl<StudioMapper, Studio> impleme
@Override
public Boolean edit(Studio studio) {
Studio oldStudio=this.getById(studio.getId());
wechatLivePlayerUtil.editRoom(studio);
return this.updateById(studio);
if(this.updateById(studio)){
// 发送更新延时任务
//直播间开始
BroadcastMessage broadcastMessage = new BroadcastMessage(studio.getId(),StudioStatusEnum.START.name());
this.timeTrigger.edit(
TimeExecuteConstant.BROADCAST_EXECUTOR,
broadcastMessage,
Long.parseLong(oldStudio.getStartTime()),
Long.parseLong(studio.getStartTime()),
DelayQueueTools.wrapperUniqueKey(DelayQueueType.BROADCAST,studio.getId()),
DateUtil.getDelayTime(Long.parseLong(studio.getStartTime())),
rocketmqCustomProperties.getPromotionTopic());
//直播间结束
broadcastMessage = new BroadcastMessage(studio.getId(),StudioStatusEnum.START.name());
this.timeTrigger.edit(
TimeExecuteConstant.BROADCAST_EXECUTOR,
broadcastMessage,
Long.parseLong(oldStudio.getEndTime()),
Long.parseLong(studio.getEndTime()),
DelayQueueTools.wrapperUniqueKey(DelayQueueType.BROADCAST,studio.getId()),
DateUtil.getDelayTime(Long.parseLong(studio.getEndTime())),
rocketmqCustomProperties.getPromotionTopic());
}
return true;
}
@Override
@ -132,6 +193,13 @@ public class StudioServiceImpl extends ServiceImpl<StudioMapper, Studio> impleme
}
@Override
public void updateStudioStatus(BroadcastMessage broadcastMessage) {
this.update(new LambdaUpdateWrapper<Studio>()
.eq(Studio::getId,broadcastMessage.getStudioId())
.set(Studio::getStatus,broadcastMessage.getStatus()));
}
/**
* 根据直播间ID获取直播间
*

View File

@ -97,7 +97,10 @@ public class FullDiscountServiceImpl extends ServiceImpl<FullDiscountMapper, Ful
}
// 保存到MONGO中
this.mongoTemplate.save(fullDiscountVO);
PromotionMessage promotionMessage = new PromotionMessage(fullDiscountVO.getId(), PromotionTypeEnum.FULL_DISCOUNT.name(), PromotionStatusEnum.START.name(), fullDiscountVO.getStartTime(), fullDiscountVO.getEndTime());
PromotionMessage promotionMessage = new PromotionMessage(fullDiscountVO.getId(), PromotionTypeEnum.FULL_DISCOUNT.name(),
PromotionStatusEnum.START.name(),
fullDiscountVO.getStartTime(), fullDiscountVO.getEndTime());
TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(TimeExecuteConstant.PROMOTION_EXECUTOR,
fullDiscountVO.getStartTime().getTime(), promotionMessage,
DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),