diff --git a/consumer/src/main/java/cn/lili/trigger/AbstractDelayQueueListen.java b/consumer/src/main/java/cn/lili/trigger/AbstractDelayQueueListen.java new file mode 100644 index 00000000..6c539183 --- /dev/null +++ b/consumer/src/main/java/cn/lili/trigger/AbstractDelayQueueListen.java @@ -0,0 +1,94 @@ +package cn.lili.trigger; + +import cn.hutool.json.JSONUtil; +import cn.lili.common.cache.Cache; +import cn.lili.common.utils.ThreadPoolUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.DefaultTypedTuple; +import org.springframework.util.CollectionUtils; + +import javax.annotation.PostConstruct; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * 延时队列工厂 + * + * @author paulG + * @since 2020/11/7 + **/ +@Slf4j +public abstract class AbstractDelayQueueListen { + + @Autowired + private Cache cache; + + + /** + * 延时队列机器开始运作 + */ + private void startDelayQueueMachine() { + log.info("延时队列机器{}开始运作", setDelayQueueName()); + + // 监听redis队列 + while (true) { + try { + // 获取当前时间的时间戳 + long now = System.currentTimeMillis() / 1000; + // 获取当前时间前需要执行的任务列表 + Set tuples = cache.zRangeByScore(setDelayQueueName(), 0, now); + + // 如果任务不为空 + if (!CollectionUtils.isEmpty(tuples)) { + log.info("执行任务:{}", JSONUtil.toJsonStr(tuples)); + + for (DefaultTypedTuple tuple : tuples) { + String jobId = (String) tuple.getValue(); + // 移除缓存,如果移除成功则表示当前线程处理了延时任务,则执行延时任务 + Long num = cache.zRemove(setDelayQueueName(), jobId); + // 如果移除成功, 则执行 + if (num > 0) { + ThreadPoolUtil.execute(() -> invoke(jobId)); + } + } + } + + } catch (Exception e) { + log.error("处理延时任务发生异常,异常原因为{}", e.getMessage(), e); + } finally { + // 间隔一秒钟搞一次 + try { + TimeUnit.SECONDS.sleep(5L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + } + + } + + /** + * 最终执行的任务方法 + * + * @param jobId 任务id + */ + public abstract void invoke(String jobId); + + + /** + * 要实现延时队列的名字 + */ + public abstract String setDelayQueueName(); + + + /** + * 监听队列 + */ + @PostConstruct + public void init() { + new Thread(this::startDelayQueueMachine).start(); + } + +} diff --git a/consumer/src/main/java/cn/lili/trigger/TimeTriggerConsumer.java b/consumer/src/main/java/cn/lili/trigger/TimeTriggerConsumer.java index 32e816a5..f0910140 100644 --- a/consumer/src/main/java/cn/lili/trigger/TimeTriggerConsumer.java +++ b/consumer/src/main/java/cn/lili/trigger/TimeTriggerConsumer.java @@ -2,9 +2,8 @@ package cn.lili.trigger; import cn.hutool.json.JSONUtil; import cn.lili.common.cache.Cache; -import cn.lili.common.trigger.interfaces.TimeTriggerExecutor; import cn.lili.common.trigger.model.TimeTriggerMsg; -import cn.lili.common.trigger.util.TimeTriggerUtil; +import cn.lili.common.trigger.util.DelayQueueTools; import cn.lili.common.utils.SpringContextUtil; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; @@ -28,7 +27,7 @@ public class TimeTriggerConsumer implements RocketMQListener { @Override public void onMessage(TimeTriggerMsg timeTriggerMsg) { try { - String key = TimeTriggerUtil.generateKey(timeTriggerMsg.getTriggerExecutor(), timeTriggerMsg.getTriggerTime(), timeTriggerMsg.getUniqueKey()); + String key = DelayQueueTools.generateKey(timeTriggerMsg.getTriggerExecutor(), timeTriggerMsg.getTriggerTime(), timeTriggerMsg.getUniqueKey()); if (cache.get(key) == null) { log.info("执行器执行被取消:{} | 任务标识:{}", timeTriggerMsg.getTriggerExecutor(), timeTriggerMsg.getUniqueKey()); diff --git a/framework/src/main/java/cn/lili/common/trigger/interfaces/TimeTriggerExecutor.java b/consumer/src/main/java/cn/lili/trigger/TimeTriggerExecutor.java similarity index 83% rename from framework/src/main/java/cn/lili/common/trigger/interfaces/TimeTriggerExecutor.java rename to consumer/src/main/java/cn/lili/trigger/TimeTriggerExecutor.java index f378852c..0a086060 100644 --- a/framework/src/main/java/cn/lili/common/trigger/interfaces/TimeTriggerExecutor.java +++ b/consumer/src/main/java/cn/lili/trigger/TimeTriggerExecutor.java @@ -1,4 +1,4 @@ -package cn.lili.common.trigger.interfaces; +package cn.lili.trigger; /** * 延时任务执行器接口 diff --git a/consumer/src/main/java/cn/lili/trigger/executor/PromotionTimeTriggerExecutor.java b/consumer/src/main/java/cn/lili/trigger/executor/PromotionTimeTriggerExecutor.java index 13d27854..211c9bde 100644 --- a/consumer/src/main/java/cn/lili/trigger/executor/PromotionTimeTriggerExecutor.java +++ b/consumer/src/main/java/cn/lili/trigger/executor/PromotionTimeTriggerExecutor.java @@ -1,13 +1,12 @@ package cn.lili.trigger.executor; import cn.hutool.json.JSONUtil; +import cn.lili.common.trigger.interfaces.TimeTrigger; +import cn.lili.trigger.TimeTriggerExecutor; import cn.lili.common.trigger.message.PintuanOrderMessage; import cn.lili.common.trigger.message.PromotionMessage; -import cn.lili.common.trigger.interfaces.TimeTrigger; -import cn.lili.common.trigger.interfaces.TimeTriggerExecutor; import cn.lili.common.trigger.model.TimeExecuteConstant; import cn.lili.common.trigger.model.TimeTriggerMsg; -import cn.lili.common.utils.DateUtil; import cn.lili.config.rocketmq.RocketmqCustomProperties; import cn.lili.modules.order.order.service.OrderService; import cn.lili.modules.promotion.entity.enums.PromotionStatusEnum; @@ -58,8 +57,10 @@ public class PromotionTimeTriggerExecutor implements TimeTriggerExecutor { // 结束时间(延时一分钟) long closeTime = promotionMessage.getEndTime().getTime() + 60000; TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(TimeExecuteConstant.PROMOTION_EXECUTOR, closeTime, promotionMessage, uniqueKey, rocketmqCustomProperties.getPromotionTopic()); - timeTrigger.addDelay(timeTriggerMsg, DateUtil.getDelayTime(promotionMessage.getEndTime().getTime())); + //添加延时任务 + timeTrigger.addDelay(timeTriggerMsg); } else { + //不是开始,则修改活动状态 promotionService.updatePromotionStatus(promotionMessage); } return; diff --git a/consumer/src/main/java/cn/lili/trigger/listen/PromotionDelayQueueListen.java b/consumer/src/main/java/cn/lili/trigger/listen/PromotionDelayQueueListen.java new file mode 100644 index 00000000..08091c68 --- /dev/null +++ b/consumer/src/main/java/cn/lili/trigger/listen/PromotionDelayQueueListen.java @@ -0,0 +1,34 @@ +package cn.lili.trigger.listen; + +import cn.hutool.json.JSONUtil; +import cn.lili.common.trigger.enums.DelayQueueEnums; +import cn.lili.common.trigger.interfaces.TimeTrigger; +import cn.lili.common.trigger.model.TimeTriggerMsg; +import cn.lili.trigger.AbstractDelayQueueListen; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * PromotionTimeTriggerListen + * + * @author Chopper + * @version v1.0 + * 2021-06-11 10:47 + */ +@Component +public class PromotionDelayQueueListen extends AbstractDelayQueueListen { + + @Autowired + private TimeTrigger timeTrigger; + + @Override + public void invoke(String jobId) { + timeTrigger.execute(JSONUtil.toBean(jobId, TimeTriggerMsg.class)); + } + + + @Override + public String setDelayQueueName() { + return DelayQueueEnums.PROMOTION_QUEUE.name(); + } +} diff --git a/framework/src/main/java/cn/lili/common/trigger/delay/AbstractDelayQueueMachineFactory.java b/framework/src/main/java/cn/lili/common/trigger/delay/AbstractDelayQueueMachineFactory.java index 60da4a6d..a2018a92 100644 --- a/framework/src/main/java/cn/lili/common/trigger/delay/AbstractDelayQueueMachineFactory.java +++ b/framework/src/main/java/cn/lili/common/trigger/delay/AbstractDelayQueueMachineFactory.java @@ -1,17 +1,9 @@ package cn.lili.common.trigger.delay; -import cn.hutool.json.JSONUtil; import cn.lili.common.cache.Cache; -import cn.lili.common.utils.ThreadPoolUtil; +import cn.lili.common.utils.DateUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.DefaultTypedTuple; -import org.springframework.util.CollectionUtils; - -import javax.annotation.PostConstruct; -import java.util.Calendar; -import java.util.Set; -import java.util.concurrent.TimeUnit; /** * 延时队列工厂 @@ -28,83 +20,25 @@ public abstract class AbstractDelayQueueMachineFactory { /** * 插入任务id * - * @param jobId 任务id(队列内唯一) - * @param time 延时时间(单位 :秒) + * @param jobId 任务id(队列内唯一) + * @param triggerTime 执行时间 时间戳(毫秒) * @return 是否插入成功 */ - public boolean addJob(String jobId, Integer time) { - //获取时间 - Calendar instance = Calendar.getInstance(); - instance.add(Calendar.SECOND, time); - long delaySeconds = instance.getTimeInMillis() / 1000; + public boolean addJob(String jobId, Long triggerTime) { + + //redis 中排序时间 + long delaySeconds = triggerTime / 1000; //增加延时任务 参数依次为:队列名称、执行时间、任务id boolean result = cache.zAdd(setDelayQueueName(), delaySeconds, jobId); - log.info("增加延时任务, 缓存key {}, 等待时间 {}", setDelayQueueName(), time); + log.info("增加延时任务, 缓存key {}, 执行时间 {},任务id {}", setDelayQueueName(), DateUtil.toString(triggerTime), jobId); return result; } - /** - * 延时队列机器开始运作 - */ - private void startDelayQueueMachine() { - log.info("延时队列机器{}开始运作", setDelayQueueName()); - - // 监听redis队列 - while (true) { - try { - // 获取当前时间的时间戳 - long now = System.currentTimeMillis() / 1000; - // 获取当前时间前的任务列表 - Set tuples = cache.zRangeByScore(setDelayQueueName(), 0, now); - - // 如果任务不为空 - if (!CollectionUtils.isEmpty(tuples)) { - log.info("执行任务:{}", JSONUtil.toJsonStr(tuples)); - - for (DefaultTypedTuple tuple : tuples) { - String jobId = (String) tuple.getValue(); - // 移除缓存,如果移除成功则表示当前线程处理了延时任务,则执行延时任务 - Long num = cache.zRemove(setDelayQueueName(), jobId); - // 如果移除成功, 则执行 - if (num > 0) { - ThreadPoolUtil.execute(() -> invoke(jobId)); - } - } - } - - } catch (Exception e) { - log.error("处理延时任务发生异常,异常原因为{}", e.getMessage(), e); - } finally { - // 间隔一秒钟搞一次 - try { - TimeUnit.SECONDS.sleep(5L); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - } - } - - } - - /** - * 最终执行的任务方法 - * - * @param jobId 任务id - */ - public abstract void invoke(String jobId); - /** * 要实现延时队列的名字 */ public abstract String setDelayQueueName(); - - @PostConstruct - public void init() { - new Thread(this::startDelayQueueMachine).start(); - } - } diff --git a/framework/src/main/java/cn/lili/common/trigger/delay/PromotionDelayQueue.java b/framework/src/main/java/cn/lili/common/trigger/delay/queue/PromotionDelayQueue.java similarity index 63% rename from framework/src/main/java/cn/lili/common/trigger/delay/PromotionDelayQueue.java rename to framework/src/main/java/cn/lili/common/trigger/delay/queue/PromotionDelayQueue.java index 792e77eb..2a072375 100644 --- a/framework/src/main/java/cn/lili/common/trigger/delay/PromotionDelayQueue.java +++ b/framework/src/main/java/cn/lili/common/trigger/delay/queue/PromotionDelayQueue.java @@ -1,8 +1,8 @@ -package cn.lili.common.trigger.delay; +package cn.lili.common.trigger.delay.queue; -import cn.hutool.json.JSONUtil; +import cn.lili.common.trigger.delay.AbstractDelayQueueMachineFactory; +import cn.lili.common.trigger.enums.DelayQueueEnums; import cn.lili.common.trigger.interfaces.TimeTrigger; -import cn.lili.common.trigger.model.TimeTriggerMsg; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -21,13 +21,8 @@ public class PromotionDelayQueue extends AbstractDelayQueueMachineFactory { @Autowired private TimeTrigger timeTrigger; - @Override - public void invoke(String jobId) { - timeTrigger.add(JSONUtil.toBean(jobId, TimeTriggerMsg.class)); - } - @Override public String setDelayQueueName() { - return "promotion_delay"; + return DelayQueueEnums.PROMOTION_QUEUE.name(); } } diff --git a/framework/src/main/java/cn/lili/common/trigger/enums/DelayQueueEnums.java b/framework/src/main/java/cn/lili/common/trigger/enums/DelayQueueEnums.java new file mode 100644 index 00000000..6bfc8618 --- /dev/null +++ b/framework/src/main/java/cn/lili/common/trigger/enums/DelayQueueEnums.java @@ -0,0 +1,19 @@ +package cn.lili.common.trigger.enums; + +/** + * 队列枚举 + */ +public enum DelayQueueEnums { + + + /** + * 促销任务队列 + */ + PROMOTION_QUEUE("促销任务队列"); + + private String description; + + DelayQueueEnums(String description) { + this.description = description; + } +} diff --git a/framework/src/main/java/cn/lili/common/trigger/enums/DelayQueueType.java b/framework/src/main/java/cn/lili/common/trigger/enums/PromotionDelayTypeEnums.java similarity index 54% rename from framework/src/main/java/cn/lili/common/trigger/enums/DelayQueueType.java rename to framework/src/main/java/cn/lili/common/trigger/enums/PromotionDelayTypeEnums.java index 6556f9fd..cf9ea551 100644 --- a/framework/src/main/java/cn/lili/common/trigger/enums/DelayQueueType.java +++ b/framework/src/main/java/cn/lili/common/trigger/enums/PromotionDelayTypeEnums.java @@ -5,8 +5,8 @@ package cn.lili.common.trigger.enums; * * @author paulG * @since 2021/5/7 - **/ -public enum DelayQueueType { + */ +public enum PromotionDelayTypeEnums { /** * 促销活动 @@ -17,14 +17,10 @@ public enum DelayQueueType { */ PINTUAN_ORDER("拼团订单"); - private final String description; + private String description; - DelayQueueType(String des) { - this.description = des; - } - - public String description() { - return this.description; + PromotionDelayTypeEnums(String description) { + this.description = description; } } diff --git a/framework/src/main/java/cn/lili/common/trigger/interfaces/TimeTrigger.java b/framework/src/main/java/cn/lili/common/trigger/interfaces/TimeTrigger.java index 9298b1ec..820314ae 100644 --- a/framework/src/main/java/cn/lili/common/trigger/interfaces/TimeTrigger.java +++ b/framework/src/main/java/cn/lili/common/trigger/interfaces/TimeTrigger.java @@ -9,22 +9,20 @@ import cn.lili.common.trigger.model.TimeTriggerMsg; */ public interface TimeTrigger { - /** * 添加延时任务 * * @param timeTriggerMsg 延时任务信息 */ - void add(TimeTriggerMsg timeTriggerMsg); + void addDelay(TimeTriggerMsg timeTriggerMsg); /** - * 添加延时任务 + * 执行延时任务 * * @param timeTriggerMsg 延时任务信息 - * @param delayTime 延时时间(秒) */ - void addDelay(TimeTriggerMsg timeTriggerMsg, int delayTime); + void execute(TimeTriggerMsg timeTriggerMsg); /** * 修改延时任务 diff --git a/framework/src/main/java/cn/lili/common/trigger/RocketmqTimerTrigger.java b/framework/src/main/java/cn/lili/common/trigger/interfaces/impl/RocketmqTimerTrigger.java similarity index 74% rename from framework/src/main/java/cn/lili/common/trigger/RocketmqTimerTrigger.java rename to framework/src/main/java/cn/lili/common/trigger/interfaces/impl/RocketmqTimerTrigger.java index 009e0f5f..b6589bc1 100644 --- a/framework/src/main/java/cn/lili/common/trigger/RocketmqTimerTrigger.java +++ b/framework/src/main/java/cn/lili/common/trigger/interfaces/impl/RocketmqTimerTrigger.java @@ -1,12 +1,12 @@ -package cn.lili.common.trigger; +package cn.lili.common.trigger.interfaces.impl; import cn.hutool.json.JSONUtil; import cn.lili.common.cache.Cache; import cn.lili.common.rocketmq.RocketmqSendCallbackBuilder; -import cn.lili.common.trigger.delay.PromotionDelayQueue; +import cn.lili.common.trigger.delay.queue.PromotionDelayQueue; import cn.lili.common.trigger.interfaces.TimeTrigger; import cn.lili.common.trigger.model.TimeTriggerMsg; -import cn.lili.common.trigger.util.TimeTriggerUtil; +import cn.lili.common.trigger.util.DelayQueueTools; import cn.lili.common.utils.DateUtil; import cn.lili.common.utils.StringUtils; import lombok.extern.slf4j.Slf4j; @@ -34,12 +34,38 @@ public class RocketmqTimerTrigger implements TimeTrigger { @Override - public void add(TimeTriggerMsg timeTriggerMsg) { - this.addExecute(timeTriggerMsg.getTriggerExecutor(), timeTriggerMsg.getParam(), timeTriggerMsg.getTriggerTime(), timeTriggerMsg.getUniqueKey(), timeTriggerMsg.getTopic()); + public void addDelay(TimeTriggerMsg timeTriggerMsg) { + //执行器唯一key + String uniqueKey = timeTriggerMsg.getUniqueKey(); + if (StringUtils.isEmpty(uniqueKey)) { + uniqueKey = StringUtils.getRandStr(10); + } + //执行任务key + String generateKey = DelayQueueTools.generateKey(timeTriggerMsg.getTriggerExecutor(), timeTriggerMsg.getTriggerTime(), uniqueKey); + this.cache.put(generateKey, 1); + //设置延时任务 + if (Boolean.TRUE.equals(promotionDelayQueue.addJob(JSONUtil.toJsonStr(timeTriggerMsg), timeTriggerMsg.getTriggerTime()))) { + log.info("延时任务标识: {}", generateKey); + log.info("定时执行在【" + DateUtil.toString(timeTriggerMsg.getTriggerTime(), "yyyy-MM-dd HH:mm:ss") + "】,消费【" + timeTriggerMsg.getParam().toString() + "】"); + } else { + log.error("延时任务添加失败:{}", timeTriggerMsg); + } + } + + @Override + public void execute(TimeTriggerMsg timeTriggerMsg) { + this.addExecute(timeTriggerMsg.getTriggerExecutor(), + timeTriggerMsg.getParam(), + timeTriggerMsg.getTriggerTime(), + timeTriggerMsg.getUniqueKey(), + timeTriggerMsg.getTopic() + ); } /** - * 添加延时任务 + * 将任务添加到mq,mq异步队列执行。 + *

+ * 本系统中redis相当于延时任务吊起机制,而mq才是实际的业务消费,执行任务的存在 * * @param executorName 执行器beanId * @param param 执行参数 @@ -50,7 +76,7 @@ public class RocketmqTimerTrigger implements TimeTrigger { * 业务内全局唯一 * @param topic rocketmq topic */ - public void addExecute(String executorName, Object param, Long triggerTime, String uniqueKey, String topic) { + private void addExecute(String executorName, Object param, Long triggerTime, String uniqueKey, String topic) { TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(executorName, triggerTime, param, uniqueKey, topic); Message message = MessageBuilder.withPayload(timeTriggerMsg).build(); @@ -58,34 +84,15 @@ public class RocketmqTimerTrigger implements TimeTrigger { this.rocketMQTemplate.asyncSend(topic, message, RocketmqSendCallbackBuilder.commonCallback()); } - @Override - public void addDelay(TimeTriggerMsg timeTriggerMsg, int delayTime) { - //执行器唯一key - String uniqueKey = timeTriggerMsg.getUniqueKey(); - if (StringUtils.isEmpty(uniqueKey)) { - uniqueKey = StringUtils.getRandStr(10); - } - //执行任务key - String generateKey = TimeTriggerUtil.generateKey(timeTriggerMsg.getTriggerExecutor(), timeTriggerMsg.getTriggerTime(), uniqueKey); - this.cache.put(generateKey, 1); - //设置延时任务 - if (Boolean.TRUE.equals(promotionDelayQueue.addJob(JSONUtil.toJsonStr(timeTriggerMsg), delayTime))) { - log.info("add Redis key {}", generateKey); - log.info("定时执行在【" + DateUtil.toString(timeTriggerMsg.getTriggerTime(), "yyyy-MM-dd HH:mm:ss") + "】,消费【" + timeTriggerMsg.getParam().toString() + "】"); - } else { - log.error("延时任务添加失败:{}", timeTriggerMsg); - } - } - @Override public void edit(String executorName, Object param, Long oldTriggerTime, Long triggerTime, String uniqueKey, int delayTime, String topic) { this.delete(executorName, oldTriggerTime, uniqueKey, topic); - this.addDelay(new TimeTriggerMsg(executorName, triggerTime, param, uniqueKey, topic), delayTime); + this.addDelay(new TimeTriggerMsg(executorName, triggerTime, param, uniqueKey, topic)); } @Override public void delete(String executorName, Long triggerTime, String uniqueKey, String topic) { - String generateKey = TimeTriggerUtil.generateKey(executorName, triggerTime, uniqueKey); + String generateKey = DelayQueueTools.generateKey(executorName, triggerTime, uniqueKey); log.info("删除延时任务{}", generateKey); this.cache.remove(generateKey); } diff --git a/framework/src/main/java/cn/lili/common/trigger/util/DelayQueueTools.java b/framework/src/main/java/cn/lili/common/trigger/util/DelayQueueTools.java index 640719f9..e5055eda 100644 --- a/framework/src/main/java/cn/lili/common/trigger/util/DelayQueueTools.java +++ b/framework/src/main/java/cn/lili/common/trigger/util/DelayQueueTools.java @@ -1,6 +1,6 @@ package cn.lili.common.trigger.util; -import cn.lili.common.trigger.enums.DelayQueueType; +import cn.lili.common.trigger.enums.PromotionDelayTypeEnums; /** * 延时任务工具类 @@ -10,6 +10,11 @@ import cn.lili.common.trigger.enums.DelayQueueType; **/ public class DelayQueueTools { + /** + * 前缀 + */ + private static final String PREFIX = "{rocketmq_trigger}_"; + /** * 组装延时任务唯一键 * @@ -17,8 +22,21 @@ public class DelayQueueTools { * @param id id * @return 唯一键 */ - public static String wrapperUniqueKey(DelayQueueType type, String id) { + public static String wrapperUniqueKey(PromotionDelayTypeEnums type, String id) { return "{TIME_TRIGGER_" + type.name() + "}_" + id; } + + /** + * 生成延时任务标识key + * + * @param executorName 执行器beanId + * @param triggerTime 执行时间 + * @param uniqueKey 自定义表示 + * @return 延时任务标识key + */ + public static String generateKey(String executorName, Long triggerTime, String uniqueKey) { + return PREFIX + (executorName + triggerTime + uniqueKey).hashCode(); + } + } diff --git a/framework/src/main/java/cn/lili/common/trigger/util/TimeTriggerUtil.java b/framework/src/main/java/cn/lili/common/trigger/util/TimeTriggerUtil.java deleted file mode 100644 index 7475c10d..00000000 --- a/framework/src/main/java/cn/lili/common/trigger/util/TimeTriggerUtil.java +++ /dev/null @@ -1,28 +0,0 @@ -package cn.lili.common.trigger.util; - -/** - * 延时任务mq实现内容,提供加密算法以及任务前缀参数 - * - * @author Chopper - */ -public class TimeTriggerUtil { - - /** - * 前缀 - */ - private static final String PREFIX = "{rocketmq_trigger}_"; - - /** - * 生成延时任务标识key - * - * @param executorName 执行器beanId - * @param triggerTime 执行时间 - * @param uniqueKey 自定义表示 - * @return 延时任务标识key - */ - public static String generateKey(String executorName, Long triggerTime, String uniqueKey) { - return PREFIX + (executorName + triggerTime + uniqueKey).hashCode(); - } - - -} diff --git a/framework/src/main/java/cn/lili/common/utils/DateUtil.java b/framework/src/main/java/cn/lili/common/utils/DateUtil.java index c6ed8b0b..cb86f07c 100644 --- a/framework/src/main/java/cn/lili/common/utils/DateUtil.java +++ b/framework/src/main/java/cn/lili/common/utils/DateUtil.java @@ -200,6 +200,15 @@ public class DateUtil { public static String toString(Date date) { return toString(date,STANDARD_FORMAT); } + /** + * 把日期转换成字符串型 + * + * @param Long 日期 + * @return + */ + public static String toString(Long date) { + return toString(date,STANDARD_FORMAT); + } /** * 把日期转换成字符串型 * diff --git a/framework/src/main/java/cn/lili/modules/goods/serviceimpl/GoodsSkuServiceImpl.java b/framework/src/main/java/cn/lili/modules/goods/serviceimpl/GoodsSkuServiceImpl.java index 17ca7d27..1a8c7788 100644 --- a/framework/src/main/java/cn/lili/modules/goods/serviceimpl/GoodsSkuServiceImpl.java +++ b/framework/src/main/java/cn/lili/modules/goods/serviceimpl/GoodsSkuServiceImpl.java @@ -39,6 +39,7 @@ import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import java.lang.reflect.Field; import java.util.*; import java.util.stream.Collectors; @@ -194,9 +195,7 @@ public class GoodsSkuServiceImpl extends ServiceImpl i if (goodsSku == null) { throw new ServiceException("商品已下架"); } - } - // 获取当前商品的索引信息 EsGoodsIndex goodsIndex = goodsIndexService.findById(skuId); if (goodsIndex == null) { diff --git a/framework/src/main/java/cn/lili/modules/order/order/serviceimpl/OrderServiceImpl.java b/framework/src/main/java/cn/lili/modules/order/order/serviceimpl/OrderServiceImpl.java index 6450a219..c7178617 100644 --- a/framework/src/main/java/cn/lili/modules/order/order/serviceimpl/OrderServiceImpl.java +++ b/framework/src/main/java/cn/lili/modules/order/order/serviceimpl/OrderServiceImpl.java @@ -6,7 +6,7 @@ import cn.hutool.core.text.CharSequenceUtil; import cn.hutool.json.JSONUtil; import cn.lili.common.aop.syslog.annotation.SystemLogPoint; import cn.lili.common.trigger.util.DelayQueueTools; -import cn.lili.common.trigger.enums.DelayQueueType; +import cn.lili.common.trigger.enums.PromotionDelayTypeEnums; import cn.lili.common.trigger.message.PintuanOrderMessage; import cn.lili.common.enums.ResultCode; import cn.lili.common.exception.ServiceException; @@ -555,9 +555,9 @@ public class OrderServiceImpl extends ServiceImpl implements TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(TimeExecuteConstant.PROMOTION_EXECUTOR, startTime, pintuanOrderMessage, - DelayQueueTools.wrapperUniqueKey(DelayQueueType.PINTUAN_ORDER, (pintuanId + parentOrderSn)), + DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PINTUAN_ORDER, (pintuanId + parentOrderSn)), rocketmqCustomProperties.getPromotionTopic()); - this.timeTrigger.addDelay(timeTriggerMsg, cn.lili.common.utils.DateUtil.getDelayTime(startTime)); + this.timeTrigger.addDelay(timeTriggerMsg); } //拼团所需人数,小于等于 参团后的人数,则说明成团,所有订单成团 if (pintuan.getRequiredNum() <= count) { diff --git a/framework/src/main/java/cn/lili/modules/promotion/serviceimpl/CouponServiceImpl.java b/framework/src/main/java/cn/lili/modules/promotion/serviceimpl/CouponServiceImpl.java index 3a0654c2..6a7fd73c 100644 --- a/framework/src/main/java/cn/lili/modules/promotion/serviceimpl/CouponServiceImpl.java +++ b/framework/src/main/java/cn/lili/modules/promotion/serviceimpl/CouponServiceImpl.java @@ -2,7 +2,7 @@ package cn.lili.modules.promotion.serviceimpl; import cn.hutool.core.text.CharSequenceUtil; import cn.lili.common.trigger.util.DelayQueueTools; -import cn.lili.common.trigger.enums.DelayQueueType; +import cn.lili.common.trigger.enums.PromotionDelayTypeEnums; import cn.lili.common.trigger.message.PromotionMessage; import cn.lili.common.exception.ServiceException; import cn.lili.common.trigger.interfaces.TimeTrigger; @@ -53,7 +53,6 @@ import java.util.stream.Collectors; */ @Service @Transactional(rollbackFor = Exception.class) - public class CouponServiceImpl extends ServiceImpl implements CouponService { //延时任务 @@ -94,10 +93,10 @@ public class CouponServiceImpl extends ServiceImpl impleme TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(TimeExecuteConstant.PROMOTION_EXECUTOR, coupon.getStartTime().getTime(), promotionMessage, - DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())), + DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())), rocketmqCustomProperties.getPromotionTopic()); // 发送促销活动开始的延时任务 - this.timeTrigger.addDelay(timeTriggerMsg, DateUtil.getDelayTime(coupon.getStartTime().getTime())); + this.timeTrigger.addDelay(timeTriggerMsg); return coupon; } @@ -117,7 +116,7 @@ public class CouponServiceImpl extends ServiceImpl impleme this.timeTrigger.edit(TimeExecuteConstant.PROMOTION_EXECUTOR, promotionMessage, coupon.getStartTime().getTime(), couponVO.getStartTime().getTime(), - DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())), + DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())), DateUtil.getDelayTime(couponVO.getStartTime().getTime()), rocketmqCustomProperties.getPromotionTopic()); return couponVO; @@ -145,7 +144,7 @@ public class CouponServiceImpl extends ServiceImpl impleme this.timeTrigger.edit(TimeExecuteConstant.PROMOTION_EXECUTOR, promotionMessage, couponVO.getStartTime().getTime(), couponVO.getStartTime().getTime(), - DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())), + DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())), DateUtil.getDelayTime(couponVO.getStartTime().getTime()), rocketmqCustomProperties.getPromotionTopic()); } @@ -167,7 +166,7 @@ public class CouponServiceImpl extends ServiceImpl impleme this.mongoTemplate.remove(new Query().addCriteria(Criteria.where("id").is(id)), CouponVO.class); this.timeTrigger.delete(TimeExecuteConstant.PROMOTION_EXECUTOR, couponVO.getStartTime().getTime(), - DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (PromotionTypeEnum.COUPON.name() + couponVO.getId())), + DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (PromotionTypeEnum.COUPON.name() + couponVO.getId())), rocketmqCustomProperties.getPromotionTopic()); return result; } diff --git a/framework/src/main/java/cn/lili/modules/promotion/serviceimpl/FullDiscountServiceImpl.java b/framework/src/main/java/cn/lili/modules/promotion/serviceimpl/FullDiscountServiceImpl.java index e997dc13..9a6f3260 100644 --- a/framework/src/main/java/cn/lili/modules/promotion/serviceimpl/FullDiscountServiceImpl.java +++ b/framework/src/main/java/cn/lili/modules/promotion/serviceimpl/FullDiscountServiceImpl.java @@ -1,7 +1,7 @@ package cn.lili.modules.promotion.serviceimpl; import cn.lili.common.trigger.util.DelayQueueTools; -import cn.lili.common.trigger.enums.DelayQueueType; +import cn.lili.common.trigger.enums.PromotionDelayTypeEnums; import cn.lili.common.trigger.message.PromotionMessage; import cn.lili.common.exception.ServiceException; import cn.lili.common.trigger.interfaces.TimeTrigger; @@ -103,10 +103,10 @@ public class FullDiscountServiceImpl extends ServiceImpl impl promotionMessage, pintuanVO.getStartTime().getTime(), pintuan.getStartTime().getTime(), - DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())), + DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())), DateUtil.getDelayTime(pintuanVO.getStartTime().getTime()), rocketmqCustomProperties.getPromotionTopic()); } - return result; } @@ -390,10 +389,10 @@ public class PintuanServiceImpl extends ServiceImpl impl TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(TimeExecuteConstant.PROMOTION_EXECUTOR, pintuan.getStartTime().getTime(), promotionMessage, - DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())), + DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())), rocketmqCustomProperties.getPromotionTopic()); // 发送促销活动开始的延时任务 - this.timeTrigger.addDelay(timeTriggerMsg, DateUtil.getDelayTime(pintuan.getStartTime().getTime())); + this.timeTrigger.addDelay(timeTriggerMsg); } /** @@ -405,7 +404,7 @@ public class PintuanServiceImpl extends ServiceImpl impl private void removePintuanGoodsFromEs(String id, Long originStartTime) { this.timeTrigger.delete(TimeExecuteConstant.PROMOTION_EXECUTOR, originStartTime, - DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (PromotionTypeEnum.PINTUAN.name() + id)), + DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (PromotionTypeEnum.PINTUAN.name() + id)), rocketmqCustomProperties.getPromotionTopic()); } diff --git a/framework/src/main/java/cn/lili/modules/promotion/serviceimpl/PointsGoodsServiceImpl.java b/framework/src/main/java/cn/lili/modules/promotion/serviceimpl/PointsGoodsServiceImpl.java index 7c078748..b9b83bf2 100644 --- a/framework/src/main/java/cn/lili/modules/promotion/serviceimpl/PointsGoodsServiceImpl.java +++ b/framework/src/main/java/cn/lili/modules/promotion/serviceimpl/PointsGoodsServiceImpl.java @@ -2,7 +2,7 @@ package cn.lili.modules.promotion.serviceimpl; import cn.hutool.core.util.StrUtil; import cn.lili.common.trigger.util.DelayQueueTools; -import cn.lili.common.trigger.enums.DelayQueueType; +import cn.lili.common.trigger.enums.PromotionDelayTypeEnums; import cn.lili.common.trigger.message.PromotionMessage; import cn.lili.common.exception.ServiceException; import cn.lili.common.trigger.interfaces.TimeTrigger; @@ -120,7 +120,7 @@ public class PointsGoodsServiceImpl extends ServiceImpl impl promotionMessage, seckill.getStartTime().getTime(), seckillVO.getStartTime().getTime(), - DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())), + DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())), DateUtil.getDelayTime(seckillVO.getStartTime().getTime()), rocketmqCustomProperties.getPromotionTopic()); } @@ -180,7 +180,7 @@ public class SeckillServiceImpl extends ServiceImpl impl this.promotionGoodsService.update(promotionGoodsQueryWrapper); this.timeTrigger.delete(TimeExecuteConstant.PROMOTION_EXECUTOR, seckill.getStartTime().getTime(), - DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (PromotionTypeEnum.SECKILL.name() + seckill.getId())), + DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (PromotionTypeEnum.SECKILL.name() + seckill.getId())), rocketmqCustomProperties.getPromotionTopic()); } else { throw new ServiceException("该限时抢购活动的状态不能删除"); @@ -223,7 +223,7 @@ public class SeckillServiceImpl extends ServiceImpl impl } this.timeTrigger.delete(TimeExecuteConstant.PROMOTION_EXECUTOR, seckillVO.getStartTime().getTime(), - DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (PromotionTypeEnum.SECKILL.name() + seckillVO.getId())), + DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (PromotionTypeEnum.SECKILL.name() + seckillVO.getId())), rocketmqCustomProperties.getPromotionTopic()); } } else { @@ -245,10 +245,10 @@ public class SeckillServiceImpl extends ServiceImpl impl TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(TimeExecuteConstant.PROMOTION_EXECUTOR, seckill.getStartTime().getTime(), promotionMessage, - DelayQueueTools.wrapperUniqueKey(DelayQueueType.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())), + DelayQueueTools.wrapperUniqueKey(PromotionDelayTypeEnums.PROMOTION, (promotionMessage.getPromotionType() + promotionMessage.getPromotionId())), rocketmqCustomProperties.getPromotionTopic()); // 发送促销活动开始的延时任务 - this.timeTrigger.addDelay(timeTriggerMsg, DateUtil.getDelayTime(seckill.getStartTime().getTime())); + this.timeTrigger.addDelay(timeTriggerMsg); } /** diff --git a/framework/src/test/java/cn/lili/test/trigger/TestTimeTrigger.java b/framework/src/test/java/cn/lili/test/trigger/TestTimeTrigger.java deleted file mode 100644 index 71c45ecb..00000000 --- a/framework/src/test/java/cn/lili/test/trigger/TestTimeTrigger.java +++ /dev/null @@ -1,35 +0,0 @@ -package cn.lili.test.trigger; - -import cn.lili.common.cache.Cache; -import cn.lili.common.trigger.interfaces.TimeTriggerExecutor; -import cn.lili.common.utils.DateUtil; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -/** - * TestTimeTrigger - * - * @author Chopper - * @version v1.0 - * @since - * 2019-02-19 下午3:01 - */ -@Component -public class TestTimeTrigger implements TimeTriggerExecutor { - - public static String key = "rabbitmq_test_value"; - @Autowired - private Cache cache; - - /** - * 执行任务 - * - * @param object 任务参数 - */ - @Override - public void execute(Object object) { - System.out.println(DateUtil.toString(DateUtil.getDateline(), "yyyy-MM-dd HH:mm:ss")); - System.out.println(key + "===" + object); - cache.put(key, object); - } -}