From d0226611d7cb9c2e3f3b99aed760baf29fedd964 Mon Sep 17 00:00:00 2001 From: paulGao Date: Mon, 25 Apr 2022 17:34:24 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=88=9D=E5=A7=8B?= =?UTF-8?q?=E5=8C=96=E7=94=9F=E6=88=90=E7=B4=A2=E5=BC=95=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../lili/modules/search/serviceimpl/EsGoodsIndexServiceImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsIndexServiceImpl.java b/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsIndexServiceImpl.java index 0a02ea2a..5338cac3 100644 --- a/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsIndexServiceImpl.java +++ b/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsIndexServiceImpl.java @@ -370,6 +370,7 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements goodsIndexRepository.deleteAll(); for (EsGoodsIndex goodsIndex : goodsIndexList) { try { + log.info("生成商品索引:{}", goodsIndex); addIndex(goodsIndex); resultMap.put(KEY_SUCCESS, resultMap.get(KEY_SUCCESS) + 1); } catch (Exception e) { From b2a8578f436a07f65f9bc79ad717f31e296de113 Mon Sep 17 00:00:00 2001 From: paulGao Date: Tue, 26 Apr 2022 11:36:28 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=BC=82=E5=B8=B8?= =?UTF-8?q?=E9=87=8D=E8=AF=95=E6=9C=BA=E5=88=B6=E3=80=82=EF=BC=88=E7=9B=AE?= =?UTF-8?q?=E5=89=8D=E4=BD=9C=E7=94=A8=E4=BA=8Ees=E6=89=B9=E9=87=8F?= =?UTF-8?q?=E6=9B=B4=E6=96=B0=E5=88=A0=E9=99=A4=E6=97=B6=EF=BC=8C=E6=9C=89?= =?UTF-8?q?=E6=97=B6=E4=BC=9A=E5=87=BA=E7=8E=B0=E7=9A=84=E5=86=85=E5=AE=B9?= =?UTF-8?q?=E7=89=88=E6=9C=AC=E5=86=B2=E7=AA=81=E9=97=AE=E9=A2=98=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../lili/listener/GoodsMessageListener.java | 19 ++++--- .../common/aop/annotation/RetryOperation.java | 25 ++++++++++ .../common/aop/interceptor/RetryAspect.java | 50 +++++++++++++++++++ .../lili/common/exception/RetryException.java | 21 ++++++++ .../serviceimpl/EsGoodsIndexServiceImpl.java | 43 +++++++++------- 5 files changed, 132 insertions(+), 26 deletions(-) create mode 100644 framework/src/main/java/cn/lili/common/aop/annotation/RetryOperation.java create mode 100644 framework/src/main/java/cn/lili/common/aop/interceptor/RetryAspect.java create mode 100644 framework/src/main/java/cn/lili/common/exception/RetryException.java diff --git a/consumer/src/main/java/cn/lili/listener/GoodsMessageListener.java b/consumer/src/main/java/cn/lili/listener/GoodsMessageListener.java index 19a1bab4..7e9b9aa3 100644 --- a/consumer/src/main/java/cn/lili/listener/GoodsMessageListener.java +++ b/consumer/src/main/java/cn/lili/listener/GoodsMessageListener.java @@ -6,6 +6,8 @@ import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.ClassLoaderUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; +import cn.lili.common.aop.annotation.RetryOperation; +import cn.lili.common.exception.RetryException; import cn.lili.event.GoodsCommentCompleteEvent; import cn.lili.modules.distribution.entity.dos.DistributionGoods; import cn.lili.modules.distribution.entity.dto.DistributionGoodsSearchParams; @@ -121,6 +123,7 @@ public class GoodsMessageListener implements RocketMQListener { private PromotionGoodsService promotionGoodsService; @Override + @RetryOperation public void onMessage(MessageExt messageExt) { switch (GoodsTagsEnum.valueOf(messageExt.getTags())) { @@ -137,7 +140,7 @@ public class GoodsMessageListener implements RocketMQListener { Goods goods = this.goodsService.getById(goodsId); updateGoodsIndex(goods); } catch (Exception e) { - log.error("生成商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody())); + log.error("生成商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e); } break; case GENERATOR_STORE_GOODS_INDEX: @@ -145,7 +148,7 @@ public class GoodsMessageListener implements RocketMQListener { String storeId = new String(messageExt.getBody()); this.updateGoodsIndex(storeId); } catch (Exception e) { - log.error("生成店铺商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody())); + log.error("生成店铺商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e); } break; case UPDATE_GOODS_INDEX_PROMOTIONS: @@ -171,7 +174,7 @@ public class GoodsMessageListener implements RocketMQListener { List goodsList = goodsService.queryListByParams(searchParams); this.updateGoodsIndex(goodsList); } catch (Exception e) { - log.error("更新商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody())); + log.error("更新商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e); } break; case UPDATE_GOODS_INDEX_FIELD: @@ -184,7 +187,7 @@ public class GoodsMessageListener implements RocketMQListener { Map updateFields = updateIndexFields.get("updateFields", Map.class); goodsIndexService.updateIndex(queryFields, updateFields); } catch (Exception e) { - log.error("更新商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody())); + log.error("更新商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e); } break; case RESET_GOODS_INDEX: @@ -193,7 +196,7 @@ public class GoodsMessageListener implements RocketMQListener { List goodsIndices = JSONUtil.toList(goodsIdsJsonStr, EsGoodsIndex.class); goodsIndexService.updateBulkIndex(goodsIndices); } catch (Exception e) { - log.error("重置商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody())); + log.error("重置商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e); } break; //审核商品 @@ -219,7 +222,7 @@ public class GoodsMessageListener implements RocketMQListener { } } catch (Exception e) { - log.error("删除商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody())); + log.error("删除商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e); } break; //规格删除 @@ -232,8 +235,10 @@ public class GoodsMessageListener implements RocketMQListener { try { String storeId = new String(messageExt.getBody()); goodsIndexService.deleteIndex(MapUtil.builder(new HashMap()).put("storeId", storeId).build()); + } catch (RetryException re) { + throw re; } catch (Exception e) { - log.error("删除店铺商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody())); + log.error("删除店铺商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e); } break; //商品评价 diff --git a/framework/src/main/java/cn/lili/common/aop/annotation/RetryOperation.java b/framework/src/main/java/cn/lili/common/aop/annotation/RetryOperation.java new file mode 100644 index 00000000..d12ab403 --- /dev/null +++ b/framework/src/main/java/cn/lili/common/aop/annotation/RetryOperation.java @@ -0,0 +1,25 @@ +package cn.lili.common.aop.annotation; + +import java.lang.annotation.*; + +/** + * 异常重试注解 + * + * @author paulG + * @since 2022/4/26 + **/ +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Inherited +public @interface RetryOperation { + /** + * 重试次数 + */ + int retryCount() default 3; + + /** + * 重试间隔 + */ + int waitSeconds() default 10; +} diff --git a/framework/src/main/java/cn/lili/common/aop/interceptor/RetryAspect.java b/framework/src/main/java/cn/lili/common/aop/interceptor/RetryAspect.java new file mode 100644 index 00000000..bc9173b6 --- /dev/null +++ b/framework/src/main/java/cn/lili/common/aop/interceptor/RetryAspect.java @@ -0,0 +1,50 @@ +package cn.lili.common.aop.interceptor; + +import cn.lili.common.aop.annotation.RetryOperation; +import cn.lili.common.exception.RetryException; +import lombok.extern.slf4j.Slf4j; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.springframework.stereotype.Component; + +/** + * @author paulG + * @since 2022/4/26 + **/ +@Aspect +@Component +@Slf4j +public class RetryAspect { + + + @Around(value = "@annotation(retryOperation)") + public Object retryOperation(ProceedingJoinPoint joinPoint, RetryOperation retryOperation) throws Throwable { + + Object response = null; + int retryCount = retryOperation.retryCount(); + int waitSeconds = retryOperation.waitSeconds(); + boolean successful = false; + + do { + try { + response = joinPoint.proceed(); + successful = true; + } catch (RetryException ex) { + log.info("Operation failed, retries remaining: {}", retryCount); + retryCount--; + if (retryCount < 0) { + successful = true; + log.error(ex.getMessage()); + } + if (waitSeconds > 0 && !successful) { + log.info("Waiting for {} second(s) before next retry", waitSeconds); + Thread.sleep(waitSeconds * 1000L); + } + } + } while (!successful); + + return response; + } + +} diff --git a/framework/src/main/java/cn/lili/common/exception/RetryException.java b/framework/src/main/java/cn/lili/common/exception/RetryException.java new file mode 100644 index 00000000..97ccb522 --- /dev/null +++ b/framework/src/main/java/cn/lili/common/exception/RetryException.java @@ -0,0 +1,21 @@ +package cn.lili.common.exception; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + * 如需异常重试,则抛出此异常 + * + * @author paulG + * @since 2022/4/26 + **/ +@EqualsAndHashCode(callSuper = true) +@Data +public class RetryException extends RuntimeException { + + private static final long serialVersionUID = 7886918292771470846L; + + public RetryException(String message) { + super(message); + } +} diff --git a/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsIndexServiceImpl.java b/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsIndexServiceImpl.java index 5338cac3..f8958231 100644 --- a/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsIndexServiceImpl.java +++ b/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsIndexServiceImpl.java @@ -12,6 +12,7 @@ import cn.lili.cache.Cache; import cn.lili.cache.CachePrefix; import cn.lili.common.enums.PromotionTypeEnum; import cn.lili.common.enums.ResultCode; +import cn.lili.common.exception.RetryException; import cn.lili.common.exception.ServiceException; import cn.lili.common.properties.RocketmqCustomProperties; import cn.lili.elasticsearch.BaseElasticsearchService; @@ -39,7 +40,6 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.assertj.core.util.IterableUtil; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.update.UpdateRequest; @@ -48,6 +48,7 @@ import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; @@ -268,7 +269,15 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements script.append("ctx._source.").append(entry.getKey()).append("=").append("'").append(entry.getValue()).append("'").append(";"); } update.setScript(new Script(script.toString())); - client.updateByQueryAsync(update, RequestOptions.DEFAULT, this.actionListener()); + update.setConflicts("proceed"); + try { + BulkByScrollResponse bulkByScrollResponse = client.updateByQuery(update, RequestOptions.DEFAULT); + if (bulkByScrollResponse.getVersionConflicts() > 0) { + throw new RetryException("更新商品索引失败,es内容版本冲突"); + } + } catch (IOException e) { + log.error("更新商品索引异常", e); + } } /** @@ -305,13 +314,23 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements */ @Override public void deleteIndex(Map queryFields) { - NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder(); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); for (Map.Entry entry : queryFields.entrySet()) { boolQueryBuilder.filter(QueryBuilders.termsQuery(entry.getKey(), entry.getValue())); } - queryBuilder.withQuery(boolQueryBuilder); - this.restTemplate.delete(queryBuilder.build(), EsGoodsIndex.class); + + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(); + deleteByQueryRequest.setQuery(boolQueryBuilder); + deleteByQueryRequest.indices(getIndexName()); + deleteByQueryRequest.setConflicts("proceed"); + try { + BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT); + if (bulkByScrollResponse.getVersionConflicts() > 0) { + throw new RetryException("删除索引失败,es内容版本冲突"); + } + } catch (IOException e) { + log.error("删除索引异常", e); + } } /** @@ -792,18 +811,4 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements index.setPromotionMapJson(JSONUtil.toJsonStr(goodsCurrentPromotionMap)); return index; } - - private ActionListener actionListener() { - return new ActionListener() { - @Override - public void onResponse(BulkByScrollResponse bulkByScrollResponse) { - log.info("UpdateByQueryResponse: {}", bulkByScrollResponse); - } - - @Override - public void onFailure(Exception e) { - log.error("UpdateByQueryRequestFailure: ", e); - } - }; - } } From 27112e35b3160bc8235dffca295b914ae69fd954 Mon Sep 17 00:00:00 2001 From: paulGao Date: Tue, 26 Apr 2022 11:37:48 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=90=9C=E7=B4=A2?= =?UTF-8?q?=E5=85=B3=E9=94=AE=E5=AD=97=E4=B8=8D=E8=83=BD=E4=B8=BA=E7=A9=BA?= =?UTF-8?q?=E6=A0=BC=E3=80=82=E4=BC=98=E5=8C=96=E7=AE=A1=E7=90=86=E7=AB=AF?= =?UTF-8?q?=E5=95=86=E5=93=81=E4=B8=8A=E4=B8=8B=E6=9E=B6=E6=97=B6=E6=B2=A1?= =?UTF-8?q?=E6=9C=89=E6=B8=85=E7=A9=BAgoods=E7=BC=93=E5=AD=98=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cn/lili/modules/goods/serviceimpl/GoodsServiceImpl.java | 3 +++ .../modules/search/serviceimpl/EsGoodsSearchServiceImpl.java | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/framework/src/main/java/cn/lili/modules/goods/serviceimpl/GoodsServiceImpl.java b/framework/src/main/java/cn/lili/modules/goods/serviceimpl/GoodsServiceImpl.java index 25792d2a..70fa8277 100644 --- a/framework/src/main/java/cn/lili/modules/goods/serviceimpl/GoodsServiceImpl.java +++ b/framework/src/main/java/cn/lili/modules/goods/serviceimpl/GoodsServiceImpl.java @@ -363,6 +363,9 @@ public class GoodsServiceImpl extends ServiceImpl implements queryWrapper.in(Goods::getId, goodsIds); List goodsList = this.list(queryWrapper); for (Goods goods : goodsList) { + if (GoodsStatusEnum.DOWN.equals(goodsStatusEnum)) { + cache.remove(CachePrefix.GOODS.getPrefix() + goods.getId()); + } goodsSkuService.updateGoodsSkuStatus(goods); } if (GoodsStatusEnum.DOWN.equals(goodsStatusEnum)) { diff --git a/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsSearchServiceImpl.java b/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsSearchServiceImpl.java index d5d252b0..bee4efee 100644 --- a/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsSearchServiceImpl.java +++ b/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsSearchServiceImpl.java @@ -88,7 +88,7 @@ public class EsGoodsSearchServiceImpl implements EsGoodsSearchService { if (!exists) { esGoodsIndexService.init(); } - if (CharSequenceUtil.isNotEmpty(searchDTO.getKeyword())) { + if (CharSequenceUtil.isNotBlank(searchDTO.getKeyword())) { cache.incrementScore(CachePrefix.HOT_WORD.getPrefix(), searchDTO.getKeyword()); } NativeSearchQueryBuilder searchQueryBuilder = createSearchQueryBuilder(searchDTO, pageVo);