diff --git a/config/application.yml b/config/application.yml index 08e4e29b..08f94a4f 100644 --- a/config/application.yml +++ b/config/application.yml @@ -280,6 +280,8 @@ lili: notice-group: lili_notice_group notice-send-topic: lili_send_notice_topic notice-send-group: lili_send_notice_group + broadcast-topic: lili_broadcast_topic + broadcast-group: lili_broadcast_group rocketmq: name-server: 192.168.0.116:9876 producer: diff --git a/consumer/src/main/java/cn/lili/trigger/executor/BroadcastTimeTriggerExecutor.java b/consumer/src/main/java/cn/lili/trigger/executor/BroadcastTimeTriggerExecutor.java new file mode 100644 index 00000000..0bc67f6c --- /dev/null +++ b/consumer/src/main/java/cn/lili/trigger/executor/BroadcastTimeTriggerExecutor.java @@ -0,0 +1,36 @@ +package cn.lili.trigger.executor; + +import cn.hutool.json.JSONUtil; +import cn.lili.common.delayqueue.BroadcastMessage; +import cn.lili.common.trigger.interfaces.TimeTriggerExecutor; +import cn.lili.common.trigger.model.TimeExecuteConstant; +import cn.lili.modules.broadcast.service.StudioService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * 直播间事件触发 + * + * @author Bulbasaur + * @date: 2021/6/1 5:02 下午 + */ +@Slf4j +@Component(TimeExecuteConstant.BROADCAST_EXECUTOR) +public class BroadcastTimeTriggerExecutor implements TimeTriggerExecutor { + + + @Autowired + private StudioService studioService; + + @Override + public void execute(Object object) { + //直播间订单消息 + BroadcastMessage broadcastMessage = JSONUtil.toBean(JSONUtil.parseObj(object), BroadcastMessage.class); + if (broadcastMessage != null && broadcastMessage.getStudioId() != null) { + log.info("直播间消费:{}", broadcastMessage); + // 修改直播间状态 + studioService.updateStudioStatus(broadcastMessage); + } + } +} diff --git a/consumer/src/main/resources/application.yml b/consumer/src/main/resources/application.yml index 79a1af45..4b27771b 100644 --- a/consumer/src/main/resources/application.yml +++ b/consumer/src/main/resources/application.yml @@ -279,6 +279,8 @@ lili: notice-send-group: lili_send_notice_group after-sale-topic: lili_after_sale_topic after-sale-group: lili_after_sale_group + broadcast-topic: lili_broadcast_topic + broadcast-group: lili_broadcast_group rocketmq: name-server: 127.0.0.1:9876 producer: diff --git a/framework/src/main/java/cn/lili/common/delayqueue/BroadcastMessage.java b/framework/src/main/java/cn/lili/common/delayqueue/BroadcastMessage.java new file mode 100644 index 00000000..42e96cbf --- /dev/null +++ b/framework/src/main/java/cn/lili/common/delayqueue/BroadcastMessage.java @@ -0,0 +1,34 @@ +package cn.lili.common.delayqueue; + +import cn.hutool.core.date.DateTime; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Date; + +/** + * 直播消息实体 + * + * @author Bulbasaur + * @date: 2021/6/1 4:48 下午 + */ +@Data +@NoArgsConstructor +public class BroadcastMessage { + + /** + * 直播间ID + */ + private String studioId; + + /** + * 状态 + */ + private String status; + + + public BroadcastMessage(String studioId, String status) { + this.studioId = studioId; + this.status = status; + } +} diff --git a/framework/src/main/java/cn/lili/common/delayqueue/DelayQueueType.java b/framework/src/main/java/cn/lili/common/delayqueue/DelayQueueType.java index bb5c1a87..86a2e147 100644 --- a/framework/src/main/java/cn/lili/common/delayqueue/DelayQueueType.java +++ b/framework/src/main/java/cn/lili/common/delayqueue/DelayQueueType.java @@ -15,7 +15,12 @@ public enum DelayQueueType { /** * 拼团订单 */ - PINTUAN_ORDER("拼团订单"); + PINTUAN_ORDER("拼团订单"), + + /** + * 直播 + */ + BROADCAST("直播"); private final String description; diff --git a/framework/src/main/java/cn/lili/common/trigger/model/TimeExecuteConstant.java b/framework/src/main/java/cn/lili/common/trigger/model/TimeExecuteConstant.java index 6438de52..82bd8754 100644 --- a/framework/src/main/java/cn/lili/common/trigger/model/TimeExecuteConstant.java +++ b/framework/src/main/java/cn/lili/common/trigger/model/TimeExecuteConstant.java @@ -11,6 +11,11 @@ public abstract class TimeExecuteConstant { */ public static final String PROMOTION_EXECUTOR = "promotionTimeTriggerExecutor"; + /** + * 直播间延迟加载执行器 + */ + public static final String BROADCAST_EXECUTOR = "broadcastTimeTriggerExecutor"; + /** * 拼团延迟加载执行器 */ diff --git a/framework/src/main/java/cn/lili/config/rocketmq/RocketmqCustomProperties.java b/framework/src/main/java/cn/lili/config/rocketmq/RocketmqCustomProperties.java index 9f17158f..daedfe43 100644 --- a/framework/src/main/java/cn/lili/config/rocketmq/RocketmqCustomProperties.java +++ b/framework/src/main/java/cn/lili/config/rocketmq/RocketmqCustomProperties.java @@ -61,4 +61,8 @@ public class RocketmqCustomProperties { private String afterSaleGroup; + private String broadcastTopic; + + private String broadcastGroup; + } diff --git a/framework/src/main/java/cn/lili/modules/broadcast/service/StudioService.java b/framework/src/main/java/cn/lili/modules/broadcast/service/StudioService.java index 9e6bba7e..0acb351b 100644 --- a/framework/src/main/java/cn/lili/modules/broadcast/service/StudioService.java +++ b/framework/src/main/java/cn/lili/modules/broadcast/service/StudioService.java @@ -1,5 +1,6 @@ package cn.lili.modules.broadcast.service; +import cn.lili.common.delayqueue.BroadcastMessage; import cn.lili.common.vo.PageVO; import cn.lili.modules.broadcast.entity.dos.Studio; import cn.lili.modules.broadcast.entity.vos.StudioVO; @@ -69,4 +70,10 @@ public interface StudioService extends IService { * @return 直播间分页 */ IPage studioList(PageVO pageVO, Integer recommend, String status); + + /** + * 修改直播间状态 + * @param broadcastMessage 直播间消息 + */ + void updateStudioStatus(BroadcastMessage broadcastMessage); } diff --git a/framework/src/main/java/cn/lili/modules/broadcast/serviceimpl/StudioServiceImpl.java b/framework/src/main/java/cn/lili/modules/broadcast/serviceimpl/StudioServiceImpl.java index 4530d8d4..ad3bf81b 100644 --- a/framework/src/main/java/cn/lili/modules/broadcast/serviceimpl/StudioServiceImpl.java +++ b/framework/src/main/java/cn/lili/modules/broadcast/serviceimpl/StudioServiceImpl.java @@ -1,15 +1,23 @@ package cn.lili.modules.broadcast.serviceimpl; import cn.hutool.json.JSONUtil; +import cn.lili.common.delayqueue.BroadcastMessage; +import cn.lili.common.delayqueue.DelayQueueTools; +import cn.lili.common.delayqueue.DelayQueueType; import cn.lili.common.enums.ResultCode; import cn.lili.common.exception.ServiceException; import cn.lili.common.security.context.UserContext; +import cn.lili.common.trigger.interfaces.TimeTrigger; +import cn.lili.common.trigger.model.TimeExecuteConstant; +import cn.lili.common.trigger.model.TimeTriggerMsg; import cn.lili.common.utils.BeanUtil; +import cn.lili.common.utils.DateUtil; import cn.lili.common.utils.PageUtil; import cn.lili.common.vo.PageVO; -import cn.lili.modules.broadcast.entity.StudioStatusEnum; +import cn.lili.config.rocketmq.RocketmqCustomProperties; import cn.lili.modules.broadcast.entity.dos.Studio; import cn.lili.modules.broadcast.entity.dos.StudioCommodity; +import cn.lili.modules.broadcast.entity.enums.StudioStatusEnum; import cn.lili.modules.broadcast.entity.vos.StudioVO; import cn.lili.modules.broadcast.mapper.CommodityMapper; import cn.lili.modules.broadcast.mapper.StudioMapper; @@ -18,6 +26,7 @@ import cn.lili.modules.broadcast.service.StudioService; import cn.lili.modules.broadcast.util.WechatLivePlayerUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import org.springframework.beans.factory.annotation.Autowired; @@ -41,6 +50,12 @@ public class StudioServiceImpl extends ServiceImpl impleme private StudioCommodityService studioCommodityService; @Resource private CommodityMapper commodityMapper; + //延时任务 + @Autowired + private TimeTrigger timeTrigger; + //Rocketmq + @Autowired + private RocketmqCustomProperties rocketmqCustomProperties; @Override public Boolean create(Studio studio) { @@ -51,7 +66,28 @@ public class StudioServiceImpl extends ServiceImpl impleme studio.setQrCodeUrl(roomMap.get("qrcodeUrl")); studio.setStoreId(UserContext.getCurrentUser().getStoreId()); studio.setStatus(StudioStatusEnum.NEW.name()); - return this.save(studio); + //直播间添加成功发送直播间开启、关闭延时任务 + if(this.save(studio)){ + //直播开启延时任务 + BroadcastMessage broadcastMessage = new BroadcastMessage(studio.getId(),StudioStatusEnum.START.name()); + TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(TimeExecuteConstant.BROADCAST_EXECUTOR, + Long.parseLong(studio.getStartTime()), broadcastMessage, + DelayQueueTools.wrapperUniqueKey(DelayQueueType.BROADCAST, studio.getId()), + rocketmqCustomProperties.getPromotionTopic()); + // 发送促销活动开始的延时任务 + this.timeTrigger.addDelay(timeTriggerMsg, DateUtil.getDelayTime(Long.parseLong(studio.getStartTime()))); + + //直播结束延时任务 + broadcastMessage = new BroadcastMessage(studio.getId(),StudioStatusEnum.END.name()); + timeTriggerMsg = new TimeTriggerMsg(TimeExecuteConstant.BROADCAST_EXECUTOR, + Long.parseLong(studio.getEndTime()), broadcastMessage, + DelayQueueTools.wrapperUniqueKey(DelayQueueType.BROADCAST, studio.getId()), + rocketmqCustomProperties.getPromotionTopic()); + // 发送促销活动开始的延时任务 + this.timeTrigger.addDelay(timeTriggerMsg, DateUtil.getDelayTime(Long.parseLong(studio.getEndTime()))); + } + return true; + } catch (Exception e) { e.printStackTrace(); throw new ServiceException(ResultCode.ERROR); @@ -61,8 +97,33 @@ public class StudioServiceImpl extends ServiceImpl impleme @Override public Boolean edit(Studio studio) { + Studio oldStudio=this.getById(studio.getId()); wechatLivePlayerUtil.editRoom(studio); - return this.updateById(studio); + if(this.updateById(studio)){ + // 发送更新延时任务 + //直播间开始 + BroadcastMessage broadcastMessage = new BroadcastMessage(studio.getId(),StudioStatusEnum.START.name()); + this.timeTrigger.edit( + TimeExecuteConstant.BROADCAST_EXECUTOR, + broadcastMessage, + Long.parseLong(oldStudio.getStartTime()), + Long.parseLong(studio.getStartTime()), + DelayQueueTools.wrapperUniqueKey(DelayQueueType.BROADCAST,studio.getId()), + DateUtil.getDelayTime(Long.parseLong(studio.getStartTime())), + rocketmqCustomProperties.getPromotionTopic()); + + //直播间结束 + broadcastMessage = new BroadcastMessage(studio.getId(),StudioStatusEnum.START.name()); + this.timeTrigger.edit( + TimeExecuteConstant.BROADCAST_EXECUTOR, + broadcastMessage, + Long.parseLong(oldStudio.getEndTime()), + Long.parseLong(studio.getEndTime()), + DelayQueueTools.wrapperUniqueKey(DelayQueueType.BROADCAST,studio.getId()), + DateUtil.getDelayTime(Long.parseLong(studio.getEndTime())), + rocketmqCustomProperties.getPromotionTopic()); + } + return true; } @Override @@ -132,6 +193,13 @@ public class StudioServiceImpl extends ServiceImpl impleme } + @Override + public void updateStudioStatus(BroadcastMessage broadcastMessage) { + this.update(new LambdaUpdateWrapper() + .eq(Studio::getId,broadcastMessage.getStudioId()) + .set(Studio::getStatus,broadcastMessage.getStatus())); + } + /** * 根据直播间ID获取直播间 * diff --git a/framework/src/main/java/cn/lili/modules/promotion/serviceimpl/FullDiscountServiceImpl.java b/framework/src/main/java/cn/lili/modules/promotion/serviceimpl/FullDiscountServiceImpl.java index 06e9081c..d38aa4e8 100644 --- a/framework/src/main/java/cn/lili/modules/promotion/serviceimpl/FullDiscountServiceImpl.java +++ b/framework/src/main/java/cn/lili/modules/promotion/serviceimpl/FullDiscountServiceImpl.java @@ -97,7 +97,10 @@ public class FullDiscountServiceImpl extends ServiceImpl