Merge branch 'master' of gitee.com:beijing_hongye_huicheng/lilishop
This commit is contained in:
commit
e9d5e705ed
@ -6,6 +6,8 @@ import cn.hutool.core.util.ArrayUtil;
|
||||
import cn.hutool.core.util.ClassLoaderUtil;
|
||||
import cn.hutool.json.JSONObject;
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import cn.lili.common.aop.annotation.RetryOperation;
|
||||
import cn.lili.common.exception.RetryException;
|
||||
import cn.lili.event.GoodsCommentCompleteEvent;
|
||||
import cn.lili.modules.distribution.entity.dos.DistributionGoods;
|
||||
import cn.lili.modules.distribution.entity.dto.DistributionGoodsSearchParams;
|
||||
@ -121,6 +123,7 @@ public class GoodsMessageListener implements RocketMQListener<MessageExt> {
|
||||
private PromotionGoodsService promotionGoodsService;
|
||||
|
||||
@Override
|
||||
@RetryOperation
|
||||
public void onMessage(MessageExt messageExt) {
|
||||
|
||||
switch (GoodsTagsEnum.valueOf(messageExt.getTags())) {
|
||||
@ -137,7 +140,7 @@ public class GoodsMessageListener implements RocketMQListener<MessageExt> {
|
||||
Goods goods = this.goodsService.getById(goodsId);
|
||||
updateGoodsIndex(goods);
|
||||
} catch (Exception e) {
|
||||
log.error("生成商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody()));
|
||||
log.error("生成商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e);
|
||||
}
|
||||
break;
|
||||
case GENERATOR_STORE_GOODS_INDEX:
|
||||
@ -145,7 +148,7 @@ public class GoodsMessageListener implements RocketMQListener<MessageExt> {
|
||||
String storeId = new String(messageExt.getBody());
|
||||
this.updateGoodsIndex(storeId);
|
||||
} catch (Exception e) {
|
||||
log.error("生成店铺商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody()));
|
||||
log.error("生成店铺商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e);
|
||||
}
|
||||
break;
|
||||
case UPDATE_GOODS_INDEX_PROMOTIONS:
|
||||
@ -171,7 +174,7 @@ public class GoodsMessageListener implements RocketMQListener<MessageExt> {
|
||||
List<Goods> goodsList = goodsService.queryListByParams(searchParams);
|
||||
this.updateGoodsIndex(goodsList);
|
||||
} catch (Exception e) {
|
||||
log.error("更新商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody()));
|
||||
log.error("更新商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e);
|
||||
}
|
||||
break;
|
||||
case UPDATE_GOODS_INDEX_FIELD:
|
||||
@ -184,7 +187,7 @@ public class GoodsMessageListener implements RocketMQListener<MessageExt> {
|
||||
Map<String, Object> updateFields = updateIndexFields.get("updateFields", Map.class);
|
||||
goodsIndexService.updateIndex(queryFields, updateFields);
|
||||
} catch (Exception e) {
|
||||
log.error("更新商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody()));
|
||||
log.error("更新商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e);
|
||||
}
|
||||
break;
|
||||
case RESET_GOODS_INDEX:
|
||||
@ -193,7 +196,7 @@ public class GoodsMessageListener implements RocketMQListener<MessageExt> {
|
||||
List<EsGoodsIndex> goodsIndices = JSONUtil.toList(goodsIdsJsonStr, EsGoodsIndex.class);
|
||||
goodsIndexService.updateBulkIndex(goodsIndices);
|
||||
} catch (Exception e) {
|
||||
log.error("重置商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody()));
|
||||
log.error("重置商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e);
|
||||
}
|
||||
break;
|
||||
//审核商品
|
||||
@ -219,7 +222,7 @@ public class GoodsMessageListener implements RocketMQListener<MessageExt> {
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("删除商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody()));
|
||||
log.error("删除商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e);
|
||||
}
|
||||
break;
|
||||
//规格删除
|
||||
@ -232,8 +235,10 @@ public class GoodsMessageListener implements RocketMQListener<MessageExt> {
|
||||
try {
|
||||
String storeId = new String(messageExt.getBody());
|
||||
goodsIndexService.deleteIndex(MapUtil.builder(new HashMap<String, Object>()).put("storeId", storeId).build());
|
||||
} catch (RetryException re) {
|
||||
throw re;
|
||||
} catch (Exception e) {
|
||||
log.error("删除店铺商品索引事件执行异常,商品信息 {}", new String(messageExt.getBody()));
|
||||
log.error("删除店铺商品索引事件执行异常,商品信息: " + new String(messageExt.getBody()), e);
|
||||
}
|
||||
break;
|
||||
//商品评价
|
||||
|
@ -0,0 +1,25 @@
|
||||
package cn.lili.common.aop.annotation;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
/**
|
||||
* 异常重试注解
|
||||
*
|
||||
* @author paulG
|
||||
* @since 2022/4/26
|
||||
**/
|
||||
@Target(ElementType.METHOD)
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
@Inherited
|
||||
public @interface RetryOperation {
|
||||
/**
|
||||
* 重试次数
|
||||
*/
|
||||
int retryCount() default 3;
|
||||
|
||||
/**
|
||||
* 重试间隔
|
||||
*/
|
||||
int waitSeconds() default 10;
|
||||
}
|
@ -0,0 +1,50 @@
|
||||
package cn.lili.common.aop.interceptor;
|
||||
|
||||
import cn.lili.common.aop.annotation.RetryOperation;
|
||||
import cn.lili.common.exception.RetryException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.aspectj.lang.ProceedingJoinPoint;
|
||||
import org.aspectj.lang.annotation.Around;
|
||||
import org.aspectj.lang.annotation.Aspect;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author paulG
|
||||
* @since 2022/4/26
|
||||
**/
|
||||
@Aspect
|
||||
@Component
|
||||
@Slf4j
|
||||
public class RetryAspect {
|
||||
|
||||
|
||||
@Around(value = "@annotation(retryOperation)")
|
||||
public Object retryOperation(ProceedingJoinPoint joinPoint, RetryOperation retryOperation) throws Throwable {
|
||||
|
||||
Object response = null;
|
||||
int retryCount = retryOperation.retryCount();
|
||||
int waitSeconds = retryOperation.waitSeconds();
|
||||
boolean successful = false;
|
||||
|
||||
do {
|
||||
try {
|
||||
response = joinPoint.proceed();
|
||||
successful = true;
|
||||
} catch (RetryException ex) {
|
||||
log.info("Operation failed, retries remaining: {}", retryCount);
|
||||
retryCount--;
|
||||
if (retryCount < 0) {
|
||||
successful = true;
|
||||
log.error(ex.getMessage());
|
||||
}
|
||||
if (waitSeconds > 0 && !successful) {
|
||||
log.info("Waiting for {} second(s) before next retry", waitSeconds);
|
||||
Thread.sleep(waitSeconds * 1000L);
|
||||
}
|
||||
}
|
||||
} while (!successful);
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
package cn.lili.common.exception;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
/**
|
||||
* 如需异常重试,则抛出此异常
|
||||
*
|
||||
* @author paulG
|
||||
* @since 2022/4/26
|
||||
**/
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Data
|
||||
public class RetryException extends RuntimeException {
|
||||
|
||||
private static final long serialVersionUID = 7886918292771470846L;
|
||||
|
||||
public RetryException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
@ -363,6 +363,9 @@ public class GoodsServiceImpl extends ServiceImpl<GoodsMapper, Goods> implements
|
||||
queryWrapper.in(Goods::getId, goodsIds);
|
||||
List<Goods> goodsList = this.list(queryWrapper);
|
||||
for (Goods goods : goodsList) {
|
||||
if (GoodsStatusEnum.DOWN.equals(goodsStatusEnum)) {
|
||||
cache.remove(CachePrefix.GOODS.getPrefix() + goods.getId());
|
||||
}
|
||||
goodsSkuService.updateGoodsSkuStatus(goods);
|
||||
}
|
||||
if (GoodsStatusEnum.DOWN.equals(goodsStatusEnum)) {
|
||||
|
@ -12,6 +12,7 @@ import cn.lili.cache.Cache;
|
||||
import cn.lili.cache.CachePrefix;
|
||||
import cn.lili.common.enums.PromotionTypeEnum;
|
||||
import cn.lili.common.enums.ResultCode;
|
||||
import cn.lili.common.exception.RetryException;
|
||||
import cn.lili.common.exception.ServiceException;
|
||||
import cn.lili.common.properties.RocketmqCustomProperties;
|
||||
import cn.lili.elasticsearch.BaseElasticsearchService;
|
||||
@ -39,7 +40,6 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.assertj.core.util.IterableUtil;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
@ -48,6 +48,7 @@ import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.query.TermQueryBuilder;
|
||||
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
||||
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
||||
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.script.ScriptType;
|
||||
@ -268,7 +269,15 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements
|
||||
script.append("ctx._source.").append(entry.getKey()).append("=").append("'").append(entry.getValue()).append("'").append(";");
|
||||
}
|
||||
update.setScript(new Script(script.toString()));
|
||||
client.updateByQueryAsync(update, RequestOptions.DEFAULT, this.actionListener());
|
||||
update.setConflicts("proceed");
|
||||
try {
|
||||
BulkByScrollResponse bulkByScrollResponse = client.updateByQuery(update, RequestOptions.DEFAULT);
|
||||
if (bulkByScrollResponse.getVersionConflicts() > 0) {
|
||||
throw new RetryException("更新商品索引失败,es内容版本冲突");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
log.error("更新商品索引异常", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -305,13 +314,23 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements
|
||||
*/
|
||||
@Override
|
||||
public void deleteIndex(Map<String, Object> queryFields) {
|
||||
NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();
|
||||
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
|
||||
for (Map.Entry<String, Object> entry : queryFields.entrySet()) {
|
||||
boolQueryBuilder.filter(QueryBuilders.termsQuery(entry.getKey(), entry.getValue()));
|
||||
}
|
||||
queryBuilder.withQuery(boolQueryBuilder);
|
||||
this.restTemplate.delete(queryBuilder.build(), EsGoodsIndex.class);
|
||||
|
||||
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
|
||||
deleteByQueryRequest.setQuery(boolQueryBuilder);
|
||||
deleteByQueryRequest.indices(getIndexName());
|
||||
deleteByQueryRequest.setConflicts("proceed");
|
||||
try {
|
||||
BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
|
||||
if (bulkByScrollResponse.getVersionConflicts() > 0) {
|
||||
throw new RetryException("删除索引失败,es内容版本冲突");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
log.error("删除索引异常", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -370,6 +389,7 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements
|
||||
goodsIndexRepository.deleteAll();
|
||||
for (EsGoodsIndex goodsIndex : goodsIndexList) {
|
||||
try {
|
||||
log.info("生成商品索引:{}", goodsIndex);
|
||||
addIndex(goodsIndex);
|
||||
resultMap.put(KEY_SUCCESS, resultMap.get(KEY_SUCCESS) + 1);
|
||||
} catch (Exception e) {
|
||||
@ -791,18 +811,4 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements
|
||||
index.setPromotionMapJson(JSONUtil.toJsonStr(goodsCurrentPromotionMap));
|
||||
return index;
|
||||
}
|
||||
|
||||
private ActionListener<BulkByScrollResponse> actionListener() {
|
||||
return new ActionListener<BulkByScrollResponse>() {
|
||||
@Override
|
||||
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
|
||||
log.info("UpdateByQueryResponse: {}", bulkByScrollResponse);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
log.error("UpdateByQueryRequestFailure: ", e);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -88,7 +88,7 @@ public class EsGoodsSearchServiceImpl implements EsGoodsSearchService {
|
||||
if (!exists) {
|
||||
esGoodsIndexService.init();
|
||||
}
|
||||
if (CharSequenceUtil.isNotEmpty(searchDTO.getKeyword())) {
|
||||
if (CharSequenceUtil.isNotBlank(searchDTO.getKeyword())) {
|
||||
cache.incrementScore(CachePrefix.HOT_WORD.getPrefix(), searchDTO.getKeyword());
|
||||
}
|
||||
NativeSearchQueryBuilder searchQueryBuilder = createSearchQueryBuilder(searchDTO, pageVo);
|
||||
|
Loading…
x
Reference in New Issue
Block a user