diff --git a/consumer/src/main/java/cn/lili/listener/GoodsMessageListener.java b/consumer/src/main/java/cn/lili/listener/GoodsMessageListener.java index 19a1bab4..7e9b9aa3 100644 --- a/consumer/src/main/java/cn/lili/listener/GoodsMessageListener.java +++ b/consumer/src/main/java/cn/lili/listener/GoodsMessageListener.java @@ -6,6 +6,8 @@ import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.ClassLoaderUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; +import cn.lili.common.aop.annotation.RetryOperation; +import cn.lili.common.exception.RetryException; import cn.lili.event.GoodsCommentCompleteEvent; import cn.lili.modules.distribution.entity.dos.DistributionGoods; import cn.lili.modules.distribution.entity.dto.DistributionGoodsSearchParams; @@ -121,6 +123,7 @@ public class GoodsMessageListener implements RocketMQListener { private PromotionGoodsService promotionGoodsService; @Override + @RetryOperation public void onMessage(MessageExt messageExt) { switch (GoodsTagsEnum.valueOf(messageExt.getTags())) { @@ -137,7 +140,7 @@ public class GoodsMessageListener implements RocketMQListener { Goods goods = this.goodsService.getById(goodsId); updateGoodsIndex(goods); } catch (Exception e) { - log.error("生成商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody())); + log.error("生成商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e); } break; case GENERATOR_STORE_GOODS_INDEX: @@ -145,7 +148,7 @@ public class GoodsMessageListener implements RocketMQListener { String storeId = new String(messageExt.getBody()); this.updateGoodsIndex(storeId); } catch (Exception e) { - log.error("生成店铺商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody())); + log.error("生成店铺商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e); } break; case UPDATE_GOODS_INDEX_PROMOTIONS: @@ -171,7 +174,7 @@ public class GoodsMessageListener implements RocketMQListener { List goodsList = goodsService.queryListByParams(searchParams); this.updateGoodsIndex(goodsList); } catch (Exception e) { - log.error("更新商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody())); + log.error("更新商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e); } break; case UPDATE_GOODS_INDEX_FIELD: @@ -184,7 +187,7 @@ public class GoodsMessageListener implements RocketMQListener { Map updateFields = updateIndexFields.get("updateFields", Map.class); goodsIndexService.updateIndex(queryFields, updateFields); } catch (Exception e) { - log.error("更新商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody())); + log.error("更新商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e); } break; case RESET_GOODS_INDEX: @@ -193,7 +196,7 @@ public class GoodsMessageListener implements RocketMQListener { List goodsIndices = JSONUtil.toList(goodsIdsJsonStr, EsGoodsIndex.class); goodsIndexService.updateBulkIndex(goodsIndices); } catch (Exception e) { - log.error("重置商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody())); + log.error("重置商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e); } break; //审核商品 @@ -219,7 +222,7 @@ public class GoodsMessageListener implements RocketMQListener { } } catch (Exception e) { - log.error("删除商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody())); + log.error("删除商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e); } break; //规格删除 @@ -232,8 +235,10 @@ public class GoodsMessageListener implements RocketMQListener { try { String storeId = new String(messageExt.getBody()); goodsIndexService.deleteIndex(MapUtil.builder(new HashMap()).put("storeId", storeId).build()); + } catch (RetryException re) { + throw re; } catch (Exception e) { - log.error("删除店铺商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody())); + log.error("删除店铺商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e); } break; //商品评价 diff --git a/framework/src/main/java/cn/lili/common/aop/annotation/RetryOperation.java b/framework/src/main/java/cn/lili/common/aop/annotation/RetryOperation.java new file mode 100644 index 00000000..d12ab403 --- /dev/null +++ b/framework/src/main/java/cn/lili/common/aop/annotation/RetryOperation.java @@ -0,0 +1,25 @@ +package cn.lili.common.aop.annotation; + +import java.lang.annotation.*; + +/** + * 异常重试注解 + * + * @author paulG + * @since 2022/4/26 + **/ +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Inherited +public @interface RetryOperation { + /** + * 重试次数 + */ + int retryCount() default 3; + + /** + * 重试间隔 + */ + int waitSeconds() default 10; +} diff --git a/framework/src/main/java/cn/lili/common/aop/interceptor/RetryAspect.java b/framework/src/main/java/cn/lili/common/aop/interceptor/RetryAspect.java new file mode 100644 index 00000000..bc9173b6 --- /dev/null +++ b/framework/src/main/java/cn/lili/common/aop/interceptor/RetryAspect.java @@ -0,0 +1,50 @@ +package cn.lili.common.aop.interceptor; + +import cn.lili.common.aop.annotation.RetryOperation; +import cn.lili.common.exception.RetryException; +import lombok.extern.slf4j.Slf4j; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.springframework.stereotype.Component; + +/** + * @author paulG + * @since 2022/4/26 + **/ +@Aspect +@Component +@Slf4j +public class RetryAspect { + + + @Around(value = "@annotation(retryOperation)") + public Object retryOperation(ProceedingJoinPoint joinPoint, RetryOperation retryOperation) throws Throwable { + + Object response = null; + int retryCount = retryOperation.retryCount(); + int waitSeconds = retryOperation.waitSeconds(); + boolean successful = false; + + do { + try { + response = joinPoint.proceed(); + successful = true; + } catch (RetryException ex) { + log.info("Operation failed, retries remaining: {}", retryCount); + retryCount--; + if (retryCount < 0) { + successful = true; + log.error(ex.getMessage()); + } + if (waitSeconds > 0 && !successful) { + log.info("Waiting for {} second(s) before next retry", waitSeconds); + Thread.sleep(waitSeconds * 1000L); + } + } + } while (!successful); + + return response; + } + +} diff --git a/framework/src/main/java/cn/lili/common/exception/RetryException.java b/framework/src/main/java/cn/lili/common/exception/RetryException.java new file mode 100644 index 00000000..97ccb522 --- /dev/null +++ b/framework/src/main/java/cn/lili/common/exception/RetryException.java @@ -0,0 +1,21 @@ +package cn.lili.common.exception; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + * 如需异常重试,则抛出此异常 + * + * @author paulG + * @since 2022/4/26 + **/ +@EqualsAndHashCode(callSuper = true) +@Data +public class RetryException extends RuntimeException { + + private static final long serialVersionUID = 7886918292771470846L; + + public RetryException(String message) { + super(message); + } +} diff --git a/framework/src/main/java/cn/lili/modules/goods/serviceimpl/GoodsServiceImpl.java b/framework/src/main/java/cn/lili/modules/goods/serviceimpl/GoodsServiceImpl.java index 25792d2a..70fa8277 100644 --- a/framework/src/main/java/cn/lili/modules/goods/serviceimpl/GoodsServiceImpl.java +++ b/framework/src/main/java/cn/lili/modules/goods/serviceimpl/GoodsServiceImpl.java @@ -363,6 +363,9 @@ public class GoodsServiceImpl extends ServiceImpl implements queryWrapper.in(Goods::getId, goodsIds); List goodsList = this.list(queryWrapper); for (Goods goods : goodsList) { + if (GoodsStatusEnum.DOWN.equals(goodsStatusEnum)) { + cache.remove(CachePrefix.GOODS.getPrefix() + goods.getId()); + } goodsSkuService.updateGoodsSkuStatus(goods); } if (GoodsStatusEnum.DOWN.equals(goodsStatusEnum)) { diff --git a/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsIndexServiceImpl.java b/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsIndexServiceImpl.java index 0a02ea2a..f8958231 100644 --- a/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsIndexServiceImpl.java +++ b/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsIndexServiceImpl.java @@ -12,6 +12,7 @@ import cn.lili.cache.Cache; import cn.lili.cache.CachePrefix; import cn.lili.common.enums.PromotionTypeEnum; import cn.lili.common.enums.ResultCode; +import cn.lili.common.exception.RetryException; import cn.lili.common.exception.ServiceException; import cn.lili.common.properties.RocketmqCustomProperties; import cn.lili.elasticsearch.BaseElasticsearchService; @@ -39,7 +40,6 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.assertj.core.util.IterableUtil; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.update.UpdateRequest; @@ -48,6 +48,7 @@ import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; @@ -268,7 +269,15 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements script.append("ctx._source.").append(entry.getKey()).append("=").append("'").append(entry.getValue()).append("'").append(";"); } update.setScript(new Script(script.toString())); - client.updateByQueryAsync(update, RequestOptions.DEFAULT, this.actionListener()); + update.setConflicts("proceed"); + try { + BulkByScrollResponse bulkByScrollResponse = client.updateByQuery(update, RequestOptions.DEFAULT); + if (bulkByScrollResponse.getVersionConflicts() > 0) { + throw new RetryException("更新商品索引失败,es内容版本冲突"); + } + } catch (IOException e) { + log.error("更新商品索引异常", e); + } } /** @@ -305,13 +314,23 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements */ @Override public void deleteIndex(Map queryFields) { - NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder(); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); for (Map.Entry entry : queryFields.entrySet()) { boolQueryBuilder.filter(QueryBuilders.termsQuery(entry.getKey(), entry.getValue())); } - queryBuilder.withQuery(boolQueryBuilder); - this.restTemplate.delete(queryBuilder.build(), EsGoodsIndex.class); + + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(); + deleteByQueryRequest.setQuery(boolQueryBuilder); + deleteByQueryRequest.indices(getIndexName()); + deleteByQueryRequest.setConflicts("proceed"); + try { + BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT); + if (bulkByScrollResponse.getVersionConflicts() > 0) { + throw new RetryException("删除索引失败,es内容版本冲突"); + } + } catch (IOException e) { + log.error("删除索引异常", e); + } } /** @@ -370,6 +389,7 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements goodsIndexRepository.deleteAll(); for (EsGoodsIndex goodsIndex : goodsIndexList) { try { + log.info("生成商品索引:{}", goodsIndex); addIndex(goodsIndex); resultMap.put(KEY_SUCCESS, resultMap.get(KEY_SUCCESS) + 1); } catch (Exception e) { @@ -791,18 +811,4 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements index.setPromotionMapJson(JSONUtil.toJsonStr(goodsCurrentPromotionMap)); return index; } - - private ActionListener actionListener() { - return new ActionListener() { - @Override - public void onResponse(BulkByScrollResponse bulkByScrollResponse) { - log.info("UpdateByQueryResponse: {}", bulkByScrollResponse); - } - - @Override - public void onFailure(Exception e) { - log.error("UpdateByQueryRequestFailure: ", e); - } - }; - } } diff --git a/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsSearchServiceImpl.java b/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsSearchServiceImpl.java index 179c8f6c..620d3bd8 100644 --- a/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsSearchServiceImpl.java +++ b/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsSearchServiceImpl.java @@ -88,7 +88,7 @@ public class EsGoodsSearchServiceImpl implements EsGoodsSearchService { if (!exists) { esGoodsIndexService.init(); } - if (CharSequenceUtil.isNotEmpty(searchDTO.getKeyword())) { + if (CharSequenceUtil.isNotBlank(searchDTO.getKeyword())) { cache.incrementScore(CachePrefix.HOT_WORD.getPrefix(), searchDTO.getKeyword()); } NativeSearchQueryBuilder searchQueryBuilder = createSearchQueryBuilder(searchDTO, pageVo);