延时任务规范,任务调度机制,修改为consumer执行,不再被每个进程监控。

This commit is contained in:
Chopper 2021-06-11 11:59:21 +08:00
parent 9195292aa9
commit ebebbcac69
22 changed files with 271 additions and 233 deletions

View File

@ -0,0 +1,94 @@
package cn.lili.trigger;
import cn.hutool.json.JSONUtil;
import cn.lili.common.cache.Cache;
import cn.lili.common.utils.ThreadPoolUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.DefaultTypedTuple;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* 延时队列工厂
*
* @author paulG
* @since 2020/11/7
**/
@Slf4j
public abstract class AbstractDelayQueueListen {
@Autowired
private Cache cache;
/**
* 延时队列机器开始运作
*/
private void startDelayQueueMachine() {
log.info("延时队列机器{}开始运作", setDelayQueueName());
// 监听redis队列
while (true) {
try {
// 获取当前时间的时间戳
long now = System.currentTimeMillis() / 1000;
// 获取当前时间前需要执行的任务列表
Set<DefaultTypedTuple> tuples = cache.zRangeByScore(setDelayQueueName(), 0, now);
// 如果任务不为空
if (!CollectionUtils.isEmpty(tuples)) {
log.info("执行任务:{}", JSONUtil.toJsonStr(tuples));
for (DefaultTypedTuple tuple : tuples) {
String jobId = (String) tuple.getValue();
// 移除缓存如果移除成功则表示当前线程处理了延时任务则执行延时任务
Long num = cache.zRemove(setDelayQueueName(), jobId);
// 如果移除成功, 则执行
if (num > 0) {
ThreadPoolUtil.execute(() -> invoke(jobId));
}
}
}
} catch (Exception e) {
log.error("处理延时任务发生异常,异常原因为{}", e.getMessage(), e);
} finally {
// 间隔一秒钟搞一次
try {
TimeUnit.SECONDS.sleep(5L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 最终执行的任务方法
*
* @param jobId 任务id
*/
public abstract void invoke(String jobId);
/**
* 要实现延时队列的名字
*/
public abstract String setDelayQueueName();
/**
* 监听队列
*/
@PostConstruct
public void init() {
new Thread(this::startDelayQueueMachine).start();
}
}

View File

@ -2,9 +2,8 @@ package cn.lili.trigger;
import cn.hutool.json.JSONUtil;
import cn.lili.common.cache.Cache;
import cn.lili.common.trigger.interfaces.TimeTriggerExecutor;
import cn.lili.common.trigger.model.TimeTriggerMsg;
import cn.lili.common.trigger.util.TimeTriggerUtil;
import cn.lili.common.trigger.util.DelayQueueTools;
import cn.lili.common.utils.SpringContextUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
@ -28,7 +27,7 @@ public class TimeTriggerConsumer implements RocketMQListener<TimeTriggerMsg> {
@Override
public void onMessage(TimeTriggerMsg timeTriggerMsg) {
try {
String key = TimeTriggerUtil.generateKey(timeTriggerMsg.getTriggerExecutor(), timeTriggerMsg.getTriggerTime(), timeTriggerMsg.getUniqueKey());
String key = DelayQueueTools.generateKey(timeTriggerMsg.getTriggerExecutor(), timeTriggerMsg.getTriggerTime(), timeTriggerMsg.getUniqueKey());
if (cache.get(key) == null) {
log.info("执行器执行被取消:{} | 任务标识:{}", timeTriggerMsg.getTriggerExecutor(), timeTriggerMsg.getUniqueKey());

View File

@ -1,4 +1,4 @@
package cn.lili.common.trigger.interfaces;
package cn.lili.trigger;
/**
* 延时任务执行器接口

View File

@ -1,13 +1,12 @@
package cn.lili.trigger.executor;
import cn.hutool.json.JSONUtil;
import cn.lili.common.trigger.interfaces.TimeTrigger;
import cn.lili.trigger.TimeTriggerExecutor;
import cn.lili.common.trigger.message.PintuanOrderMessage;
import cn.lili.common.trigger.message.PromotionMessage;
import cn.lili.common.trigger.interfaces.TimeTrigger;
import cn.lili.common.trigger.interfaces.TimeTriggerExecutor;
import cn.lili.common.trigger.model.TimeExecuteConstant;
import cn.lili.common.trigger.model.TimeTriggerMsg;
import cn.lili.common.utils.DateUtil;
import cn.lili.config.rocketmq.RocketmqCustomProperties;
import cn.lili.modules.order.order.service.OrderService;
import cn.lili.modules.promotion.entity.enums.PromotionStatusEnum;
@ -58,8 +57,10 @@ public class PromotionTimeTriggerExecutor implements TimeTriggerExecutor {
// 结束时间延时一分钟
long closeTime = promotionMessage.getEndTime().getTime() + 60000;
TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(TimeExecuteConstant.PROMOTION_EXECUTOR, closeTime, promotionMessage, uniqueKey, rocketmqCustomProperties.getPromotionTopic());
timeTrigger.addDelay(timeTriggerMsg, DateUtil.getDelayTime(promotionMessage.getEndTime().getTime()));
//添加延时任务
timeTrigger.addDelay(timeTriggerMsg);
} else {
//不是开始则修改活动状态
promotionService.updatePromotionStatus(promotionMessage);
}
return;

View File

@ -0,0 +1,34 @@
package cn.lili.trigger.listen;
import cn.hutool.json.JSONUtil;
import cn.lili.common.trigger.enums.DelayQueueEnums;
import cn.lili.common.trigger.interfaces.TimeTrigger;
import cn.lili.common.trigger.model.TimeTriggerMsg;
import cn.lili.trigger.AbstractDelayQueueListen;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* PromotionTimeTriggerListen
*
* @author Chopper
* @version v1.0
* 2021-06-11 10:47
*/
@Component
public class PromotionDelayQueueListen extends AbstractDelayQueueListen {
@Autowired
private TimeTrigger timeTrigger;
@Override
public void invoke(String jobId) {
timeTrigger.execute(JSONUtil.toBean(jobId, TimeTriggerMsg.class));
}
@Override
public String setDelayQueueName() {
return DelayQueueEnums.PROMOTION_QUEUE.name();
}
}

View File

@ -1,17 +1,9 @@
package cn.lili.common.trigger.delay;
import cn.hutool.json.JSONUtil;
import cn.lili.common.cache.Cache;
import cn.lili.common.utils.ThreadPoolUtil;
import cn.lili.common.utils.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.DefaultTypedTuple;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.util.Calendar;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* 延时队列工厂
@ -29,82 +21,24 @@ public abstract class AbstractDelayQueueMachineFactory {
* 插入任务id
*
* @param jobId 任务id(队列内唯一)
* @param time 延时时间(单位 :)
* @param triggerTime 执行时间 时间戳毫秒
* @return 是否插入成功
*/
public boolean addJob(String jobId, Integer time) {
//获取时间
Calendar instance = Calendar.getInstance();
instance.add(Calendar.SECOND, time);
long delaySeconds = instance.getTimeInMillis() / 1000;
public boolean addJob(String jobId, Long triggerTime) {
//redis 中排序时间
long delaySeconds = triggerTime / 1000;
//增加延时任务 参数依次为队列名称执行时间任务id
boolean result = cache.zAdd(setDelayQueueName(), delaySeconds, jobId);
log.info("增加延时任务, 缓存key {}, 等待时间 {}", setDelayQueueName(), time);
log.info("增加延时任务, 缓存key {}, 执行时间 {},任务id {}", setDelayQueueName(), DateUtil.toString(triggerTime), jobId);
return result;
}
/**
* 延时队列机器开始运作
*/
private void startDelayQueueMachine() {
log.info("延时队列机器{}开始运作", setDelayQueueName());
// 监听redis队列
while (true) {
try {
// 获取当前时间的时间戳
long now = System.currentTimeMillis() / 1000;
// 获取当前时间前的任务列表
Set<DefaultTypedTuple> tuples = cache.zRangeByScore(setDelayQueueName(), 0, now);
// 如果任务不为空
if (!CollectionUtils.isEmpty(tuples)) {
log.info("执行任务:{}", JSONUtil.toJsonStr(tuples));
for (DefaultTypedTuple tuple : tuples) {
String jobId = (String) tuple.getValue();
// 移除缓存如果移除成功则表示当前线程处理了延时任务则执行延时任务
Long num = cache.zRemove(setDelayQueueName(), jobId);
// 如果移除成功, 则执行
if (num > 0) {
ThreadPoolUtil.execute(() -> invoke(jobId));
}
}
}
} catch (Exception e) {
log.error("处理延时任务发生异常,异常原因为{}", e.getMessage(), e);
} finally {
// 间隔一秒钟搞一次
try {
TimeUnit.SECONDS.sleep(5L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 最终执行的任务方法
*
* @param jobId 任务id
*/
public abstract void invoke(String jobId);
/**
* 要实现延时队列的名字
*/
public abstract String setDelayQueueName();
@PostConstruct
public void init() {
new Thread(this::startDelayQueueMachine).start();
}
}

View File

@ -1,8 +1,8 @@
package cn.lili.common.trigger.delay;
package cn.lili.common.trigger.delay.queue;
import cn.hutool.json.JSONUtil;
import cn.lili.common.trigger.delay.AbstractDelayQueueMachineFactory;
import cn.lili.common.trigger.enums.DelayQueueEnums;
import cn.lili.common.trigger.interfaces.TimeTrigger;
import cn.lili.common.trigger.model.TimeTriggerMsg;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -21,13 +21,8 @@ public class PromotionDelayQueue extends AbstractDelayQueueMachineFactory {
@Autowired
private TimeTrigger timeTrigger;
@Override
public void invoke(String jobId) {
timeTrigger.add(JSONUtil.toBean(jobId, TimeTriggerMsg.class));
}
@Override
public String setDelayQueueName() {
return "promotion_delay";
return DelayQueueEnums.PROMOTION_QUEUE.name();
}
}

View File

@ -0,0 +1,19 @@
package cn.lili.common.trigger.enums;
/**
* 队列枚举
*/
public enum DelayQueueEnums {
/**
* 促销任务队列
*/
PROMOTION_QUEUE("促销任务队列");
private String description;
DelayQueueEnums(String description) {
this.description = description;
}
}

View File

@ -5,8 +5,8 @@ package cn.lili.common.trigger.enums;
*
* @author paulG
* @since 2021/5/7
**/
public enum DelayQueueType {
*/
public enum PromotionDelayTypeEnums {
/**
* 促销活动
@ -17,14 +17,10 @@ public enum DelayQueueType {
*/
PINTUAN_ORDER("拼团订单");
private final String description;
private String description;
DelayQueueType(String des) {
this.description = des;
}
public String description() {
return this.description;
PromotionDelayTypeEnums(String description) {
this.description = description;
}
}

View File

@ -9,22 +9,20 @@ import cn.lili.common.trigger.model.TimeTriggerMsg;
*/
public interface TimeTrigger {
/**
* 添加延时任务
*
* @param timeTriggerMsg 延时任务信息
*/
void add(TimeTriggerMsg timeTriggerMsg);
void addDelay(TimeTriggerMsg timeTriggerMsg);
/**
* 添加延时任务
* 执行延时任务
*
* @param timeTriggerMsg 延时任务信息
* @param delayTime 延时时间
*/
void addDelay(TimeTriggerMsg timeTriggerMsg, int delayTime);
void execute(TimeTriggerMsg timeTriggerMsg);
/**
* 修改延时任务

View File

@ -1,12 +1,12 @@
package cn.lili.common.trigger;
package cn.lili.common.trigger.interfaces.impl;
import cn.hutool.json.JSONUtil;
import cn.lili.common.cache.Cache;
import cn.lili.common.rocketmq.RocketmqSendCallbackBuilder;
import cn.lili.common.trigger.delay.PromotionDelayQueue;
import cn.lili.common.trigger.delay.queue.PromotionDelayQueue;
import cn.lili.common.trigger.interfaces.TimeTrigger;
import cn.lili.common.trigger.model.TimeTriggerMsg;
import cn.lili.common.trigger.util.TimeTriggerUtil;
import cn.lili.common.trigger.util.DelayQueueTools;
import cn.lili.common.utils.DateUtil;
import cn.lili.common.utils.StringUtils;
import lombok.extern.slf4j.Slf4j;
@ -34,12 +34,38 @@ public class RocketmqTimerTrigger implements TimeTrigger {
@Override
public void add(TimeTriggerMsg timeTriggerMsg) {
this.addExecute(timeTriggerMsg.getTriggerExecutor(), timeTriggerMsg.getParam(), timeTriggerMsg.getTriggerTime(), timeTriggerMsg.getUniqueKey(), timeTriggerMsg.getTopic());
public void addDelay(TimeTriggerMsg timeTriggerMsg) {
//执行器唯一key
String uniqueKey = timeTriggerMsg.getUniqueKey();
if (StringUtils.isEmpty(uniqueKey)) {
uniqueKey = StringUtils.getRandStr(10);
}
//执行任务key
String generateKey = DelayQueueTools.generateKey(timeTriggerMsg.getTriggerExecutor(), timeTriggerMsg.getTriggerTime(), uniqueKey);
this.cache.put(generateKey, 1);
//设置延时任务
if (Boolean.TRUE.equals(promotionDelayQueue.addJob(JSONUtil.toJsonStr(timeTriggerMsg), timeTriggerMsg.getTriggerTime()))) {
log.info("延时任务标识: {}", generateKey);
log.info("定时执行在【" + DateUtil.toString(timeTriggerMsg.getTriggerTime(), "yyyy-MM-dd HH:mm:ss") + "】,消费【" + timeTriggerMsg.getParam().toString() + "");
} else {
log.error("延时任务添加失败:{}", timeTriggerMsg);
}
}
@Override
public void execute(TimeTriggerMsg timeTriggerMsg) {
this.addExecute(timeTriggerMsg.getTriggerExecutor(),
timeTriggerMsg.getParam(),
timeTriggerMsg.getTriggerTime(),
timeTriggerMsg.getUniqueKey(),
timeTriggerMsg.getTopic()
);
}
/**
* 添加延时任务
* 将任务添加到mqmq异步队列执行
* <p>
* 本系统中redis相当于延时任务吊起机制而mq才是实际的业务消费执行任务的存在
*
* @param executorName 执行器beanId
* @param param 执行参数
@ -50,7 +76,7 @@ public class RocketmqTimerTrigger implements TimeTrigger {
* 业务内全局唯一
* @param topic rocketmq topic
*/
public void addExecute(String executorName, Object param, Long triggerTime, String uniqueKey, String topic) {
private void addExecute(String executorName, Object param, Long triggerTime, String uniqueKey, String topic) {
TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(executorName, triggerTime, param, uniqueKey, topic);
Message<TimeTriggerMsg> message = MessageBuilder.withPayload(timeTriggerMsg).build();
@ -58,34 +84,15 @@ public class RocketmqTimerTrigger implements TimeTrigger {
this.rocketMQTemplate.asyncSend(topic, message, RocketmqSendCallbackBuilder.commonCallback());
}
@Override
public void addDelay(TimeTriggerMsg timeTriggerMsg, int delayTime) {
//执行器唯一key
String uniqueKey = timeTriggerMsg.getUniqueKey();
if (StringUtils.isEmpty(uniqueKey)) {
uniqueKey = StringUtils.getRandStr(10);
}
//执行任务key
String generateKey = TimeTriggerUtil.generateKey(timeTriggerMsg.getTriggerExecutor(), timeTriggerMsg.getTriggerTime(), uniqueKey);
this.cache.put(generateKey, 1);
//设置延时任务
if (Boolean.TRUE.equals(promotionDelayQueue.addJob(JSONUtil.toJsonStr(timeTriggerMsg), delayTime))) {
log.info("add Redis key {}", generateKey);
log.info("定时执行在【" + DateUtil.toString(timeTriggerMsg.getTriggerTime(), "yyyy-MM-dd HH:mm:ss") + "】,消费【" + timeTriggerMsg.getParam().toString() + "");
} else {
log.error("延时任务添加失败:{}", timeTriggerMsg);
}
}
@Override
public void edit(String executorName, Object param, Long oldTriggerTime, Long triggerTime, String uniqueKey, int delayTime, String topic) {
this.delete(executorName, oldTriggerTime, uniqueKey, topic);
this.addDelay(new TimeTriggerMsg(executorName, triggerTime, param, uniqueKey, topic), delayTime);
this.addDelay(new TimeTriggerMsg(executorName, triggerTime, param, uniqueKey, topic));
}
@Override
public void delete(String executorName, Long triggerTime, String uniqueKey, String topic) {
String generateKey = TimeTriggerUtil.generateKey(executorName, triggerTime, uniqueKey);
String generateKey = DelayQueueTools.generateKey(executorName, triggerTime, uniqueKey);
log.info("删除延时任务{}", generateKey);
this.cache.remove(generateKey);
}

View File

@ -1,6 +1,6 @@
package cn.lili.common.trigger.util;
import cn.lili.common.trigger.enums.DelayQueueType;
import cn.lili.common.trigger.enums.PromotionDelayTypeEnums;
/**
* 延时任务工具类
@ -10,6 +10,11 @@ import cn.lili.common.trigger.enums.DelayQueueType;
**/
public class DelayQueueTools {
/**
* 前缀
*/
private static final String PREFIX = "{rocketmq_trigger}_";
/**
* 组装延时任务唯一键
*
@ -17,8 +22,21 @@ public class DelayQueueTools {
* @param id id
* @return 唯一键
*/
public static String wrapperUniqueKey(DelayQueueType type, String id) {
public static String wrapperUniqueKey(PromotionDelayTypeEnums type, String id) {
return "{TIME_TRIGGER_" + type.name() + "}_" + id;
}
/**
* 生成延时任务标识key
*
* @param executorName 执行器beanId
* @param triggerTime 执行时间
* @param uniqueKey 自定义表示
* @return 延时任务标识key
*/
public static String generateKey(String executorName, Long triggerTime, String uniqueKey) {
return PREFIX + (executorName + triggerTime + uniqueKey).hashCode();
}
}

View File

@ -1,28 +0,0 @@
package cn.lili.common.trigger.util;
/**
* 延时任务mq实现内容提供加密算法以及任务前缀参数
*
* @author Chopper
*/
public class TimeTriggerUtil {
/**
* 前缀
*/
private static final String PREFIX = "{rocketmq_trigger}_";
/**
* 生成延时任务标识key
*
* @param executorName 执行器beanId
* @param triggerTime 执行时间
* @param uniqueKey 自定义表示
* @return 延时任务标识key
*/
public static String generateKey(String executorName, Long triggerTime, String uniqueKey) {
return PREFIX + (executorName + triggerTime + uniqueKey).hashCode();
}
}

View File

@ -200,6 +200,15 @@ public class DateUtil {
public static String toString(Date date) {
return toString(date,STANDARD_FORMAT);
}
/**
* 把日期转换成字符串型
*
* @param Long 日期
* @return
*/
public static String toString(Long date) {
return toString(date,STANDARD_FORMAT);
}
/**
* 把日期转换成字符串型
*

View File

@ -39,6 +39,7 @@ import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.lang.reflect.Field;
import java.util.*;
import java.util.stream.Collectors;
@ -194,9 +195,7 @@ public class GoodsSkuServiceImpl extends ServiceImpl<GoodsSkuMapper, GoodsSku> i
if (goodsSku == null) {
throw new ServiceException("商品已下架");
}
}
// 获取当前商品的索引信息
EsGoodsIndex goodsIndex = goodsIndexService.findById(skuId);
if (goodsIndex == null) {

View File

@ -6,7 +6,7 @@ import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.json.JSONUtil;
import cn.lili.common.aop.syslog.annotation.SystemLogPoint;
import cn.lili.common.trigger.util.DelayQueueTools;
import cn.lili.common.trigger.enums.DelayQueueType;
import cn.lili.common.trigger.enums.PromotionDelayTypeEnums;
import cn.lili.common.trigger.message.PintuanOrderMessage;
import cn.lili.common.enums.ResultCode;
import cn.lili.common.exception.ServiceException;
@ -555,9 +555,9 @@ public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements
TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(TimeExecuteConstant.PROMOTION_EXECUTOR,
startTime,
pintuanOrderMessage,
DelayQueueTools.wrapperUniqueKey(DelayQueueType.PINTUAN_ORDER, (pintuanId + parentOrderSn)),
DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PINTUAN_ORDER, (pintuanId + parentOrderSn)),
rocketmqCustomProperties.getPromotionTopic());
this.timeTrigger.addDelay(timeTriggerMsg, cn.lili.common.utils.DateUtil.getDelayTime(startTime));
this.timeTrigger.addDelay(timeTriggerMsg);
}
//拼团所需人数小于等于 参团后的人数则说明成团所有订单成团
if (pintuan.getRequiredNum() <= count) {

View File

@ -2,7 +2,7 @@ package cn.lili.modules.promotion.serviceimpl;
import cn.hutool.core.text.CharSequenceUtil;
import cn.lili.common.trigger.util.DelayQueueTools;
import cn.lili.common.trigger.enums.DelayQueueType;
import cn.lili.common.trigger.enums.PromotionDelayTypeEnums;
import cn.lili.common.trigger.message.PromotionMessage;
import cn.lili.common.exception.ServiceException;
import cn.lili.common.trigger.interfaces.TimeTrigger;
@ -53,7 +53,6 @@ import java.util.stream.Collectors;
*/
@Service
@Transactional(rollbackFor = Exception.class)
public class CouponServiceImpl extends ServiceImpl<CouponMapper, Coupon> implements CouponService {
//延时任务
@ -94,10 +93,10 @@ public class CouponServiceImpl extends ServiceImpl<CouponMapper, Coupon> impleme
TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(TimeExecuteConstant.PROMOTION_EXECUTOR,
coupon.getStartTime().getTime(),
promotionMessage,
DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),
DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),
rocketmqCustomProperties.getPromotionTopic());
// 发送促销活动开始的延时任务
this.timeTrigger.addDelay(timeTriggerMsg, DateUtil.getDelayTime(coupon.getStartTime().getTime()));
this.timeTrigger.addDelay(timeTriggerMsg);
return coupon;
}
@ -117,7 +116,7 @@ public class CouponServiceImpl extends ServiceImpl<CouponMapper, Coupon> impleme
this.timeTrigger.edit(TimeExecuteConstant.PROMOTION_EXECUTOR,
promotionMessage,
coupon.getStartTime().getTime(), couponVO.getStartTime().getTime(),
DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),
DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),
DateUtil.getDelayTime(couponVO.getStartTime().getTime()),
rocketmqCustomProperties.getPromotionTopic());
return couponVO;
@ -145,7 +144,7 @@ public class CouponServiceImpl extends ServiceImpl<CouponMapper, Coupon> impleme
this.timeTrigger.edit(TimeExecuteConstant.PROMOTION_EXECUTOR,
promotionMessage,
couponVO.getStartTime().getTime(), couponVO.getStartTime().getTime(),
DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),
DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),
DateUtil.getDelayTime(couponVO.getStartTime().getTime()),
rocketmqCustomProperties.getPromotionTopic());
}
@ -167,7 +166,7 @@ public class CouponServiceImpl extends ServiceImpl<CouponMapper, Coupon> impleme
this.mongoTemplate.remove(new Query().addCriteria(Criteria.where("id").is(id)), CouponVO.class);
this.timeTrigger.delete(TimeExecuteConstant.PROMOTION_EXECUTOR,
couponVO.getStartTime().getTime(),
DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (PromotionTypeEnum.COUPON.name() + couponVO.getId())),
DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (PromotionTypeEnum.COUPON.name() + couponVO.getId())),
rocketmqCustomProperties.getPromotionTopic());
return result;
}

View File

@ -1,7 +1,7 @@
package cn.lili.modules.promotion.serviceimpl;
import cn.lili.common.trigger.util.DelayQueueTools;
import cn.lili.common.trigger.enums.DelayQueueType;
import cn.lili.common.trigger.enums.PromotionDelayTypeEnums;
import cn.lili.common.trigger.message.PromotionMessage;
import cn.lili.common.exception.ServiceException;
import cn.lili.common.trigger.interfaces.TimeTrigger;
@ -103,10 +103,10 @@ public class FullDiscountServiceImpl extends ServiceImpl<FullDiscountMapper, Ful
PromotionMessage promotionMessage = new PromotionMessage(fullDiscountVO.getId(), PromotionTypeEnum.FULL_DISCOUNT.name(), PromotionStatusEnum.START.name(), fullDiscountVO.getStartTime(), fullDiscountVO.getEndTime());
TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(TimeExecuteConstant.PROMOTION_EXECUTOR,
fullDiscountVO.getStartTime().getTime(), promotionMessage,
DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),
DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),
rocketmqCustomProperties.getPromotionTopic());
// 发送促销活动开始的延时任务
this.timeTrigger.addDelay(timeTriggerMsg, DateUtil.getDelayTime(fullDiscountVO.getStartTime().getTime()));
this.timeTrigger.addDelay(timeTriggerMsg);
return fullDiscountVO;
}
@ -159,7 +159,7 @@ public class FullDiscountServiceImpl extends ServiceImpl<FullDiscountMapper, Ful
// 发送更新延时任务
this.timeTrigger.edit(TimeExecuteConstant.PROMOTION_EXECUTOR, promotionMessage,
fullDiscount.getStartTime().getTime(), fullDiscountVO.getStartTime().getTime(),
DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),
DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),
DateUtil.getDelayTime(fullDiscountVO.getStartTime().getTime()),
rocketmqCustomProperties.getPromotionTopic());
return fullDiscountVO;
@ -186,7 +186,7 @@ public class FullDiscountServiceImpl extends ServiceImpl<FullDiscountMapper, Ful
this.promotionGoodsService.removePromotionGoods(fullDiscount.getPromotionGoodsList(), PromotionTypeEnum.FULL_DISCOUNT);
}
this.timeTrigger.delete(TimeExecuteConstant.PROMOTION_EXECUTOR, fullDiscount.getStartTime().getTime(),
DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (PromotionTypeEnum.FULL_DISCOUNT.name() + fullDiscount.getId())),
DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (PromotionTypeEnum.FULL_DISCOUNT.name() + fullDiscount.getId())),
rocketmqCustomProperties.getPromotionTopic());
return result;
}

View File

@ -2,7 +2,7 @@ package cn.lili.modules.promotion.serviceimpl;
import cn.hutool.core.bean.BeanUtil;
import cn.lili.common.trigger.util.DelayQueueTools;
import cn.lili.common.trigger.enums.DelayQueueType;
import cn.lili.common.trigger.enums.PromotionDelayTypeEnums;
import cn.lili.common.trigger.message.PromotionMessage;
import cn.lili.common.exception.ServiceException;
import cn.lili.common.trigger.interfaces.TimeTrigger;
@ -222,11 +222,10 @@ public class PintuanServiceImpl extends ServiceImpl<PintuanMapper, Pintuan> impl
promotionMessage,
pintuanVO.getStartTime().getTime(),
pintuan.getStartTime().getTime(),
DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),
DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),
DateUtil.getDelayTime(pintuanVO.getStartTime().getTime()),
rocketmqCustomProperties.getPromotionTopic());
}
return result;
}
@ -390,10 +389,10 @@ public class PintuanServiceImpl extends ServiceImpl<PintuanMapper, Pintuan> impl
TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(TimeExecuteConstant.PROMOTION_EXECUTOR,
pintuan.getStartTime().getTime(),
promotionMessage,
DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),
DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),
rocketmqCustomProperties.getPromotionTopic());
// 发送促销活动开始的延时任务
this.timeTrigger.addDelay(timeTriggerMsg, DateUtil.getDelayTime(pintuan.getStartTime().getTime()));
this.timeTrigger.addDelay(timeTriggerMsg);
}
/**
@ -405,7 +404,7 @@ public class PintuanServiceImpl extends ServiceImpl<PintuanMapper, Pintuan> impl
private void removePintuanGoodsFromEs(String id, Long originStartTime) {
this.timeTrigger.delete(TimeExecuteConstant.PROMOTION_EXECUTOR,
originStartTime,
DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (PromotionTypeEnum.PINTUAN.name() + id)),
DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (PromotionTypeEnum.PINTUAN.name() + id)),
rocketmqCustomProperties.getPromotionTopic());
}

View File

@ -2,7 +2,7 @@ package cn.lili.modules.promotion.serviceimpl;
import cn.hutool.core.util.StrUtil;
import cn.lili.common.trigger.util.DelayQueueTools;
import cn.lili.common.trigger.enums.DelayQueueType;
import cn.lili.common.trigger.enums.PromotionDelayTypeEnums;
import cn.lili.common.trigger.message.PromotionMessage;
import cn.lili.common.exception.ServiceException;
import cn.lili.common.trigger.interfaces.TimeTrigger;
@ -120,7 +120,7 @@ public class PointsGoodsServiceImpl extends ServiceImpl<PointsGoodsMapper, Point
promotionMessage,
pointsGoodsVO.getStartTime().getTime(),
pointsGoods.getStartTime().getTime(),
DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),
DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),
DateUtil.getDelayTime(pointsGoods.getStartTime().getTime()),
rocketmqCustomProperties.getPromotionTopic());
}
@ -149,7 +149,7 @@ public class PointsGoodsServiceImpl extends ServiceImpl<PointsGoodsMapper, Point
this.goodsIndexService.deleteEsGoodsPromotionIndexByList(Collections.singletonList(pointsGoodsVO.getSkuId()), PromotionTypeEnum.POINTS_GOODS);
this.timeTrigger.delete(TimeExecuteConstant.PROMOTION_EXECUTOR,
pointsGoodsVO.getStartTime().getTime(),
DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (PromotionTypeEnum.POINTS_GOODS.name() + pointsGoodsVO.getId())),
DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (PromotionTypeEnum.POINTS_GOODS.name() + pointsGoodsVO.getId())),
rocketmqCustomProperties.getPromotionTopic());
}
}
@ -169,7 +169,7 @@ public class PointsGoodsServiceImpl extends ServiceImpl<PointsGoodsMapper, Point
PointsGoodsVO pointsGoodsVO = this.checkExist(id);
this.timeTrigger.delete(TimeExecuteConstant.PROMOTION_EXECUTOR,
pointsGoodsVO.getStartTime().getTime(),
DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (PromotionTypeEnum.POINTS_GOODS.name() + pointsGoodsVO.getId())),
DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (PromotionTypeEnum.POINTS_GOODS.name() + pointsGoodsVO.getId())),
rocketmqCustomProperties.getPromotionTopic());
skuIds.add(pointsGoodsVO.getSkuId());
}
@ -241,10 +241,10 @@ public class PointsGoodsServiceImpl extends ServiceImpl<PointsGoodsMapper, Point
TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(TimeExecuteConstant.PROMOTION_EXECUTOR,
promotionMessage.getStartTime().getTime(),
promotionMessage,
DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),
DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),
rocketmqCustomProperties.getPromotionTopic());
// 发送促销活动开始的延时任务
this.timeTrigger.addDelay(timeTriggerMsg, DateUtil.getDelayTime(promotionMessage.getStartTime().getTime()));
this.timeTrigger.addDelay(timeTriggerMsg);
}
/**

View File

@ -1,7 +1,7 @@
package cn.lili.modules.promotion.serviceimpl;
import cn.lili.common.trigger.util.DelayQueueTools;
import cn.lili.common.trigger.enums.DelayQueueType;
import cn.lili.common.trigger.enums.PromotionDelayTypeEnums;
import cn.lili.common.trigger.message.PromotionMessage;
import cn.lili.common.exception.ServiceException;
import cn.lili.common.trigger.interfaces.TimeTrigger;
@ -159,7 +159,7 @@ public class SeckillServiceImpl extends ServiceImpl<SeckillMapper, Seckill> impl
promotionMessage,
seckill.getStartTime().getTime(),
seckillVO.getStartTime().getTime(),
DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),
DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),
DateUtil.getDelayTime(seckillVO.getStartTime().getTime()),
rocketmqCustomProperties.getPromotionTopic());
}
@ -180,7 +180,7 @@ public class SeckillServiceImpl extends ServiceImpl<SeckillMapper, Seckill> impl
this.promotionGoodsService.update(promotionGoodsQueryWrapper);
this.timeTrigger.delete(TimeExecuteConstant.PROMOTION_EXECUTOR,
seckill.getStartTime().getTime(),
DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (PromotionTypeEnum.SECKILL.name() + seckill.getId())),
DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (PromotionTypeEnum.SECKILL.name() + seckill.getId())),
rocketmqCustomProperties.getPromotionTopic());
} else {
throw new ServiceException("该限时抢购活动的状态不能删除");
@ -223,7 +223,7 @@ public class SeckillServiceImpl extends ServiceImpl<SeckillMapper, Seckill> impl
}
this.timeTrigger.delete(TimeExecuteConstant.PROMOTION_EXECUTOR,
seckillVO.getStartTime().getTime(),
DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (PromotionTypeEnum.SECKILL.name() + seckillVO.getId())),
DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (PromotionTypeEnum.SECKILL.name() + seckillVO.getId())),
rocketmqCustomProperties.getPromotionTopic());
}
} else {
@ -245,10 +245,10 @@ public class SeckillServiceImpl extends ServiceImpl<SeckillMapper, Seckill> impl
TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(TimeExecuteConstant.PROMOTION_EXECUTOR,
seckill.getStartTime().getTime(),
promotionMessage,
DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),
DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())),
rocketmqCustomProperties.getPromotionTopic());
// 发送促销活动开始的延时任务
this.timeTrigger.addDelay(timeTriggerMsg, DateUtil.getDelayTime(seckill.getStartTime().getTime()));
this.timeTrigger.addDelay(timeTriggerMsg);
}
/**

View File

@ -1,35 +0,0 @@
package cn.lili.test.trigger;
import cn.lili.common.cache.Cache;
import cn.lili.common.trigger.interfaces.TimeTriggerExecutor;
import cn.lili.common.utils.DateUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* TestTimeTrigger
*
* @author Chopper
* @version v1.0
* @since
* 2019-02-19 下午3:01
*/
@Component
public class TestTimeTrigger implements TimeTriggerExecutor {
public static String key = "rabbitmq_test_value";
@Autowired
private Cache cache;
/**
* 执行任务
*
* @param object 任务参数
*/
@Override
public void execute(Object object) {
System.out.println(DateUtil.toString(DateUtil.getDateline(), "yyyy-MM-dd HH:mm:ss"));
System.out.println(key + "===" + object);
cache.put(key, object);
}
}