增加异常重试机制。(目前作用于es批量更新删除时,有时会出现的内容版本冲突问题)
This commit is contained in:
		
							parent
							
								
									d0226611d7
								
							
						
					
					
						commit
						b2a8578f43
					
				| @ -6,6 +6,8 @@ import cn.hutool.core.util.ArrayUtil; | ||||
| import cn.hutool.core.util.ClassLoaderUtil; | ||||
| import cn.hutool.json.JSONObject; | ||||
| import cn.hutool.json.JSONUtil; | ||||
| import cn.lili.common.aop.annotation.RetryOperation; | ||||
| import cn.lili.common.exception.RetryException; | ||||
| import cn.lili.event.GoodsCommentCompleteEvent; | ||||
| import cn.lili.modules.distribution.entity.dos.DistributionGoods; | ||||
| import cn.lili.modules.distribution.entity.dto.DistributionGoodsSearchParams; | ||||
| @ -121,6 +123,7 @@ public class GoodsMessageListener implements RocketMQListener<MessageExt> { | ||||
|     private PromotionGoodsService promotionGoodsService; | ||||
| 
 | ||||
|     @Override | ||||
|     @RetryOperation | ||||
|     public void onMessage(MessageExt messageExt) { | ||||
| 
 | ||||
|         switch (GoodsTagsEnum.valueOf(messageExt.getTags())) { | ||||
| @ -137,7 +140,7 @@ public class GoodsMessageListener implements RocketMQListener<MessageExt> { | ||||
|                     Goods goods = this.goodsService.getById(goodsId); | ||||
|                     updateGoodsIndex(goods); | ||||
|                 } catch (Exception e) { | ||||
|                     log.error("生成商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody())); | ||||
|                     log.error("生成商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e); | ||||
|                 } | ||||
|                 break; | ||||
|             case GENERATOR_STORE_GOODS_INDEX: | ||||
| @ -145,7 +148,7 @@ public class GoodsMessageListener implements RocketMQListener<MessageExt> { | ||||
|                     String storeId = new String(messageExt.getBody()); | ||||
|                     this.updateGoodsIndex(storeId); | ||||
|                 } catch (Exception e) { | ||||
|                     log.error("生成店铺商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody())); | ||||
|                     log.error("生成店铺商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e); | ||||
|                 } | ||||
|                 break; | ||||
|             case UPDATE_GOODS_INDEX_PROMOTIONS: | ||||
| @ -171,7 +174,7 @@ public class GoodsMessageListener implements RocketMQListener<MessageExt> { | ||||
|                     List<Goods> goodsList = goodsService.queryListByParams(searchParams); | ||||
|                     this.updateGoodsIndex(goodsList); | ||||
|                 } catch (Exception e) { | ||||
|                     log.error("更新商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody())); | ||||
|                     log.error("更新商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e); | ||||
|                 } | ||||
|                 break; | ||||
|             case UPDATE_GOODS_INDEX_FIELD: | ||||
| @ -184,7 +187,7 @@ public class GoodsMessageListener implements RocketMQListener<MessageExt> { | ||||
|                     Map<String, Object> updateFields = updateIndexFields.get("updateFields", Map.class); | ||||
|                     goodsIndexService.updateIndex(queryFields, updateFields); | ||||
|                 } catch (Exception e) { | ||||
|                     log.error("更新商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody())); | ||||
|                     log.error("更新商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e); | ||||
|                 } | ||||
|                 break; | ||||
|             case RESET_GOODS_INDEX: | ||||
| @ -193,7 +196,7 @@ public class GoodsMessageListener implements RocketMQListener<MessageExt> { | ||||
|                     List<EsGoodsIndex> goodsIndices = JSONUtil.toList(goodsIdsJsonStr, EsGoodsIndex.class); | ||||
|                     goodsIndexService.updateBulkIndex(goodsIndices); | ||||
|                 } catch (Exception e) { | ||||
|                     log.error("重置商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody())); | ||||
|                     log.error("重置商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e); | ||||
|                 } | ||||
|                 break; | ||||
|             //审核商品 | ||||
| @ -219,7 +222,7 @@ public class GoodsMessageListener implements RocketMQListener<MessageExt> { | ||||
|                     } | ||||
| 
 | ||||
|                 } catch (Exception e) { | ||||
|                     log.error("删除商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody())); | ||||
|                     log.error("删除商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e); | ||||
|                 } | ||||
|                 break; | ||||
|             //规格删除 | ||||
| @ -232,8 +235,10 @@ public class GoodsMessageListener implements RocketMQListener<MessageExt> { | ||||
|                 try { | ||||
|                     String storeId = new String(messageExt.getBody()); | ||||
|                     goodsIndexService.deleteIndex(MapUtil.builder(new HashMap<String, Object>()).put("storeId", storeId).build()); | ||||
|                 } catch (RetryException re) { | ||||
|                     throw re; | ||||
|                 } catch (Exception e) { | ||||
|                     log.error("删除店铺商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody())); | ||||
|                     log.error("删除店铺商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e); | ||||
|                 } | ||||
|                 break; | ||||
|             //商品评价 | ||||
|  | ||||
| @ -0,0 +1,25 @@ | ||||
| package cn.lili.common.aop.annotation; | ||||
| 
 | ||||
| import java.lang.annotation.*; | ||||
| 
 | ||||
| /** | ||||
|  * 异常重试注解 | ||||
|  * | ||||
|  * @author paulG | ||||
|  * @since 2022/4/26 | ||||
|  **/ | ||||
| @Target(ElementType.METHOD) | ||||
| @Retention(RetentionPolicy.RUNTIME) | ||||
| @Documented | ||||
| @Inherited | ||||
| public @interface RetryOperation { | ||||
|     /** | ||||
|      * 重试次数 | ||||
|      */ | ||||
|     int retryCount() default 3; | ||||
| 
 | ||||
|     /** | ||||
|      * 重试间隔 | ||||
|      */ | ||||
|     int waitSeconds() default 10; | ||||
| } | ||||
| @ -0,0 +1,50 @@ | ||||
| package cn.lili.common.aop.interceptor; | ||||
| 
 | ||||
| import cn.lili.common.aop.annotation.RetryOperation; | ||||
| import cn.lili.common.exception.RetryException; | ||||
| import lombok.extern.slf4j.Slf4j; | ||||
| import org.aspectj.lang.ProceedingJoinPoint; | ||||
| import org.aspectj.lang.annotation.Around; | ||||
| import org.aspectj.lang.annotation.Aspect; | ||||
| import org.springframework.stereotype.Component; | ||||
| 
 | ||||
| /** | ||||
|  * @author paulG | ||||
|  * @since 2022/4/26 | ||||
|  **/ | ||||
| @Aspect | ||||
| @Component | ||||
| @Slf4j | ||||
| public class RetryAspect { | ||||
| 
 | ||||
| 
 | ||||
|     @Around(value = "@annotation(retryOperation)") | ||||
|     public Object retryOperation(ProceedingJoinPoint joinPoint, RetryOperation retryOperation) throws Throwable { | ||||
| 
 | ||||
|         Object response = null; | ||||
|         int retryCount = retryOperation.retryCount(); | ||||
|         int waitSeconds = retryOperation.waitSeconds(); | ||||
|         boolean successful = false; | ||||
| 
 | ||||
|         do { | ||||
|             try { | ||||
|                 response = joinPoint.proceed(); | ||||
|                 successful = true; | ||||
|             } catch (RetryException ex) { | ||||
|                 log.info("Operation failed, retries remaining: {}", retryCount); | ||||
|                 retryCount--; | ||||
|                 if (retryCount < 0) { | ||||
|                     successful = true; | ||||
|                     log.error(ex.getMessage()); | ||||
|                 } | ||||
|                 if (waitSeconds > 0 && !successful) { | ||||
|                     log.info("Waiting for {} second(s) before next retry", waitSeconds); | ||||
|                     Thread.sleep(waitSeconds * 1000L); | ||||
|                 } | ||||
|             } | ||||
|         } while (!successful); | ||||
| 
 | ||||
|         return response; | ||||
|     } | ||||
| 
 | ||||
| } | ||||
| @ -0,0 +1,21 @@ | ||||
| package cn.lili.common.exception; | ||||
| 
 | ||||
| import lombok.Data; | ||||
| import lombok.EqualsAndHashCode; | ||||
| 
 | ||||
| /** | ||||
|  * 如需异常重试,则抛出此异常 | ||||
|  * | ||||
|  * @author paulG | ||||
|  * @since 2022/4/26 | ||||
|  **/ | ||||
| @EqualsAndHashCode(callSuper = true) | ||||
| @Data | ||||
| public class RetryException extends RuntimeException { | ||||
| 
 | ||||
|     private static final long serialVersionUID = 7886918292771470846L; | ||||
| 
 | ||||
|     public RetryException(String message) { | ||||
|         super(message); | ||||
|     } | ||||
| } | ||||
| @ -12,6 +12,7 @@ import cn.lili.cache.Cache; | ||||
| import cn.lili.cache.CachePrefix; | ||||
| import cn.lili.common.enums.PromotionTypeEnum; | ||||
| import cn.lili.common.enums.ResultCode; | ||||
| import cn.lili.common.exception.RetryException; | ||||
| import cn.lili.common.exception.ServiceException; | ||||
| import cn.lili.common.properties.RocketmqCustomProperties; | ||||
| import cn.lili.elasticsearch.BaseElasticsearchService; | ||||
| @ -39,7 +40,6 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; | ||||
| import lombok.extern.slf4j.Slf4j; | ||||
| import org.apache.rocketmq.spring.core.RocketMQTemplate; | ||||
| import org.assertj.core.util.IterableUtil; | ||||
| import org.elasticsearch.action.ActionListener; | ||||
| import org.elasticsearch.action.bulk.BulkRequest; | ||||
| import org.elasticsearch.action.bulk.BulkResponse; | ||||
| import org.elasticsearch.action.update.UpdateRequest; | ||||
| @ -48,6 +48,7 @@ import org.elasticsearch.index.query.BoolQueryBuilder; | ||||
| import org.elasticsearch.index.query.QueryBuilders; | ||||
| import org.elasticsearch.index.query.TermQueryBuilder; | ||||
| import org.elasticsearch.index.reindex.BulkByScrollResponse; | ||||
| import org.elasticsearch.index.reindex.DeleteByQueryRequest; | ||||
| import org.elasticsearch.index.reindex.UpdateByQueryRequest; | ||||
| import org.elasticsearch.script.Script; | ||||
| import org.elasticsearch.script.ScriptType; | ||||
| @ -268,7 +269,15 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements | ||||
|             script.append("ctx._source.").append(entry.getKey()).append("=").append("'").append(entry.getValue()).append("'").append(";"); | ||||
|         } | ||||
|         update.setScript(new Script(script.toString())); | ||||
|         client.updateByQueryAsync(update, RequestOptions.DEFAULT, this.actionListener()); | ||||
|         update.setConflicts("proceed"); | ||||
|         try { | ||||
|             BulkByScrollResponse bulkByScrollResponse = client.updateByQuery(update, RequestOptions.DEFAULT); | ||||
|             if (bulkByScrollResponse.getVersionConflicts() > 0) { | ||||
|                 throw new RetryException("更新商品索引失败,es内容版本冲突"); | ||||
|             } | ||||
|         } catch (IOException e) { | ||||
|             log.error("更新商品索引异常", e); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     /** | ||||
| @ -305,13 +314,23 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements | ||||
|      */ | ||||
|     @Override | ||||
|     public void deleteIndex(Map<String, Object> queryFields) { | ||||
|         NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder(); | ||||
|         BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); | ||||
|         for (Map.Entry<String, Object> entry : queryFields.entrySet()) { | ||||
|             boolQueryBuilder.filter(QueryBuilders.termsQuery(entry.getKey(), entry.getValue())); | ||||
|         } | ||||
|         queryBuilder.withQuery(boolQueryBuilder); | ||||
|         this.restTemplate.delete(queryBuilder.build(), EsGoodsIndex.class); | ||||
| 
 | ||||
|         DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(); | ||||
|         deleteByQueryRequest.setQuery(boolQueryBuilder); | ||||
|         deleteByQueryRequest.indices(getIndexName()); | ||||
|         deleteByQueryRequest.setConflicts("proceed"); | ||||
|         try { | ||||
|             BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT); | ||||
|             if (bulkByScrollResponse.getVersionConflicts() > 0) { | ||||
|                 throw new RetryException("删除索引失败,es内容版本冲突"); | ||||
|             } | ||||
|         } catch (IOException e) { | ||||
|             log.error("删除索引异常", e); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     /** | ||||
| @ -792,18 +811,4 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements | ||||
|         index.setPromotionMapJson(JSONUtil.toJsonStr(goodsCurrentPromotionMap)); | ||||
|         return index; | ||||
|     } | ||||
| 
 | ||||
|     private ActionListener<BulkByScrollResponse> actionListener() { | ||||
|         return new ActionListener<BulkByScrollResponse>() { | ||||
|             @Override | ||||
|             public void onResponse(BulkByScrollResponse bulkByScrollResponse) { | ||||
|                 log.info("UpdateByQueryResponse: {}", bulkByScrollResponse); | ||||
|             } | ||||
| 
 | ||||
|             @Override | ||||
|             public void onFailure(Exception e) { | ||||
|                 log.error("UpdateByQueryRequestFailure: ", e); | ||||
|             } | ||||
|         }; | ||||
|     } | ||||
| } | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user
	 paulGao
						paulGao