diff --git a/consumer/src/main/java/cn/lili/listener/GoodsMessageListener.java b/consumer/src/main/java/cn/lili/listener/GoodsMessageListener.java index 1ed1c737..692e3ade 100644 --- a/consumer/src/main/java/cn/lili/listener/GoodsMessageListener.java +++ b/consumer/src/main/java/cn/lili/listener/GoodsMessageListener.java @@ -8,6 +8,7 @@ import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import cn.lili.common.aop.annotation.RetryOperation; import cn.lili.common.exception.RetryException; +import cn.lili.common.vo.PageVO; import cn.lili.event.GoodsCommentCompleteEvent; import cn.lili.modules.distribution.entity.dos.DistributionGoods; import cn.lili.modules.distribution.entity.dto.DistributionGoodsSearchParams; @@ -33,6 +34,8 @@ import cn.lili.modules.promotion.service.PromotionService; import cn.lili.modules.search.entity.dos.EsGoodsIndex; import cn.lili.modules.search.service.EsGoodsIndexService; import cn.lili.rocketmq.tags.GoodsTagsEnum; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; @@ -54,6 +57,8 @@ import java.util.stream.Collectors; @RocketMQMessageListener(topic = "${lili.data.rocketmq.goods-topic}", consumerGroup = "${lili.data.rocketmq.goods-group}") public class GoodsMessageListener implements RocketMQListener { + private static final int BATCH_SIZE = 10; + /** * ES商品 */ @@ -175,10 +180,8 @@ public class GoodsMessageListener implements RocketMQListener { try { String updateIndexFieldsJsonStr = new String(messageExt.getBody()); JSONObject updateIndexFields = JSONUtil.parseObj(updateIndexFieldsJsonStr); - @SuppressWarnings("unchecked") - Map queryFields = updateIndexFields.get("queryFields", Map.class); - @SuppressWarnings("unchecked") - Map updateFields = updateIndexFields.get("updateFields", Map.class); + @SuppressWarnings("unchecked") Map queryFields = updateIndexFields.get("queryFields", Map.class); + @SuppressWarnings("unchecked") Map updateFields = updateIndexFields.get("updateFields", Map.class); goodsIndexService.updateIndex(queryFields, updateFields); } catch (Exception e) { log.error("更新商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e); @@ -238,10 +241,7 @@ public class GoodsMessageListener implements RocketMQListener { try { goodsCommentCompleteEvent.goodsComment(memberEvaluation); } catch (Exception e) { - log.error("评价{},在{}业务中,状态修改事件执行异常", - new String(messageExt.getBody()), - goodsCommentCompleteEvent.getClass().getName(), - e); + log.error("评价{},在{}业务中,状态修改事件执行异常", new String(messageExt.getBody()), goodsCommentCompleteEvent.getClass().getName(), e); } } break; @@ -260,26 +260,42 @@ public class GoodsMessageListener implements RocketMQListener { log.info("更新商品索引促销信息: {}", promotionsJsonStr); JSONObject jsonObject = JSONUtil.parseObj(promotionsJsonStr); // 转换为详细的促销信息(注:促销信息必须继承自 BasePromotions,且必须保证派生类存在与sdk包下) - BasePromotions promotions = (BasePromotions) jsonObject.get("promotions", - ClassLoaderUtil.loadClass(jsonObject.get("promotionsType").toString())); + BasePromotions promotions = (BasePromotions) jsonObject.get("promotions", ClassLoaderUtil.loadClass(jsonObject.get("promotionsType").toString())); // 获取促销唯一key,由 促销类型 + 促销id 组成 String esPromotionKey = jsonObject.get("esPromotionKey").toString(); if (PromotionsScopeTypeEnum.PORTION_GOODS.name().equals(promotions.getScopeType())) { - PromotionGoodsSearchParams searchParams = new PromotionGoodsSearchParams(); - searchParams.setPromotionId(promotions.getId()); - List promotionGoodsList = this.promotionGoodsService.listFindAll(searchParams); - List skuIds = promotionGoodsList.stream().map(PromotionGoods::getSkuId).collect(Collectors.toList()); - // 更新商品索引促销信息(删除原索引中相关的促销信息,更新索引中促销信息) - this.goodsIndexService.deleteEsGoodsPromotionByPromotionKey(skuIds, esPromotionKey); - this.goodsIndexService.updateEsGoodsIndexByList(promotionGoodsList, promotions, esPromotionKey); + for (int i = 0; ; i++) { + PromotionGoodsSearchParams searchParams = new PromotionGoodsSearchParams(); + searchParams.setPromotionId(promotions.getId()); + PageVO pageVO = new PageVO(); + pageVO.setPageNumber(i); + pageVO.setPageSize(BATCH_SIZE); + Page promotionGoodsPage = this.promotionGoodsService.pageFindAll(searchParams, pageVO); + if (promotionGoodsPage == null || promotionGoodsPage.getRecords().isEmpty()) { + break; + } + List skuIds = promotionGoodsPage.getRecords().stream().map(PromotionGoods::getSkuId).collect(Collectors.toList()); + // 更新商品索引促销信息(删除原索引中相关的促销信息,更新索引中促销信息) + this.goodsIndexService.deleteEsGoodsPromotionByPromotionKey(skuIds, esPromotionKey); + this.goodsIndexService.updateEsGoodsIndexByList(promotionGoodsPage.getRecords(), promotions, esPromotionKey); + } + } else if (PromotionsScopeTypeEnum.PORTION_GOODS_CATEGORY.name().equals(promotions.getScopeType())) { - GoodsSearchParams searchParams = new GoodsSearchParams(); - searchParams.setCategoryPath(promotions.getScopeId()); - List goodsSkuByList = this.goodsSkuService.getGoodsSkuByList(searchParams); - List skuIds = goodsSkuByList.stream().map(GoodsSku::getId).collect(Collectors.toList()); - // 更新商品索引促销信息(删除原索引中相关的促销信息,更新索引中促销信息) - this.goodsIndexService.deleteEsGoodsPromotionByPromotionKey(skuIds, esPromotionKey); - this.goodsIndexService.updateEsGoodsIndexPromotions(skuIds, promotions, esPromotionKey); + for (int i = 0; ; i++) { + GoodsSearchParams searchParams = new GoodsSearchParams(); + searchParams.setCategoryPath(promotions.getScopeId()); + searchParams.setPageNumber(i); + searchParams.setPageSize(BATCH_SIZE); + IPage goodsSkuByPage = this.goodsSkuService.getGoodsSkuByPage(searchParams); + if (goodsSkuByPage == null || goodsSkuByPage.getRecords().isEmpty()) { + break; + } + List skuIds = goodsSkuByPage.getRecords().stream().map(GoodsSku::getId).collect(Collectors.toList()); + // 更新商品索引促销信息(删除原索引中相关的促销信息,更新索引中促销信息) + this.goodsIndexService.deleteEsGoodsPromotionByPromotionKey(skuIds, esPromotionKey); + this.goodsIndexService.updateEsGoodsIndexPromotions(skuIds, promotions, esPromotionKey); + } + } else if (PromotionsScopeTypeEnum.ALL.name().equals(promotions.getScopeType())) { this.goodsIndexService.updateEsGoodsIndexAllByList(promotions, esPromotionKey); } @@ -321,26 +337,31 @@ public class GoodsMessageListener implements RocketMQListener { * @param goods 商品消息 */ private void updateGoodsIndex(Goods goods) { - //如果商品通过审核&&并且已上架 - GoodsSearchParams searchParams = new GoodsSearchParams(); - searchParams.setGoodsId(goods.getId()); - List goodsSkuList = this.goodsSkuService.getGoodsSkuByList(searchParams); - log.info("goods:{}", goods); - log.info("goodsSkuList:{}", goodsSkuList); - if (goods.getAuthFlag().equals(GoodsAuthEnum.PASS.name()) - && goods.getMarketEnable().equals(GoodsStatusEnum.UPPER.name()) - && Boolean.FALSE.equals(goods.getDeleteFlag())) { - this.generatorGoodsIndex(goods, goodsSkuList); - } - //如果商品状态值不支持es搜索,那么将商品信息做下架处理 - else { - for (GoodsSku goodsSku : goodsSkuList) { - EsGoodsIndex esGoodsOld = goodsIndexService.findById(goodsSku.getId()); - if (esGoodsOld != null) { - goodsIndexService.deleteIndexById(goodsSku.getId()); + for (int i = 1; ; i++) { + //如果商品通过审核&&并且已上架 + GoodsSearchParams searchParams = new GoodsSearchParams(); + searchParams.setGoodsId(goods.getId()); + searchParams.setPageNumber(i); + searchParams.setPageSize(BATCH_SIZE); + IPage goodsSkuByPage = this.goodsSkuService.getGoodsSkuByPage(searchParams); + if (goodsSkuByPage == null || goodsSkuByPage.getRecords().isEmpty()) { + break; + } + log.info("goods:{}", goods); + log.info("goodsSkuList:{}", goodsSkuByPage.getRecords()); + if (goods.getAuthFlag().equals(GoodsAuthEnum.PASS.name()) && goods.getMarketEnable().equals(GoodsStatusEnum.UPPER.name()) && Boolean.FALSE.equals(goods.getDeleteFlag())) { + this.generatorGoodsIndex(goods, goodsSkuByPage.getRecords()); + } else { + //如果商品状态值不支持es搜索,那么将商品信息做下架处理 + for (GoodsSku goodsSku : goodsSkuByPage.getRecords()) { + EsGoodsIndex esGoodsOld = goodsIndexService.findById(goodsSku.getId()); + if (esGoodsOld != null) { + goodsIndexService.deleteIndexById(goodsSku.getId()); + } } } } + } /** @@ -462,9 +483,7 @@ public class GoodsMessageListener implements RocketMQListener { goodsSku.setBuyCount(buyCount); goodsSkuService.update(goodsSku); - this.goodsIndexService.updateIndex( - MapUtil.builder(new HashMap()).put("id", goodsCompleteMessage.getSkuId()).build(), - MapUtil.builder(new HashMap()).put("buyCount", buyCount).build()); + this.goodsIndexService.updateIndex(MapUtil.builder(new HashMap()).put("id", goodsCompleteMessage.getSkuId()).build(), MapUtil.builder(new HashMap()).put("buyCount", buyCount).build()); } else { log.error("商品SkuId为[" + goodsCompleteMessage.getGoodsId() + "的商品不存在,更新商品失败!");