优化es索引更新,把单次更新优化为批量更新
This commit is contained in:
parent
74e5e86eff
commit
194d0b6ac2
@ -144,7 +144,6 @@ public class GoodsMessageListener implements RocketMQListener<MessageExt> {
|
||||
break;
|
||||
case DELETE_GOODS_INDEX_PROMOTIONS:
|
||||
BasePromotions promotions = JSONUtil.toBean(new String(messageExt.getBody()), BasePromotions.class);
|
||||
log.info("删除索引信息: {}", promotions);
|
||||
if (CharSequenceUtil.isNotEmpty(promotions.getScopeId())) {
|
||||
this.goodsIndexService.deleteEsGoodsPromotionByPromotionId(Arrays.asList(promotions.getScopeId().split(",")), promotions.getId());
|
||||
} else {
|
||||
@ -264,7 +263,7 @@ public class GoodsMessageListener implements RocketMQListener<MessageExt> {
|
||||
this.goodsIndexService.updateEsGoodsIndexAllByList(promotions, esPromotionKey);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("生成商品索引促销信息执行异常",e);
|
||||
log.error("生成商品索引促销信息执行异常", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6,6 +6,7 @@ import cn.lili.modules.goods.entity.dto.GoodsParamsDTO;
|
||||
import cn.lili.modules.promotion.entity.dos.BasePromotions;
|
||||
import cn.lili.modules.promotion.entity.dos.PromotionGoods;
|
||||
import cn.lili.modules.search.entity.dos.EsGoodsIndex;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -25,9 +26,11 @@ public interface EsGoodsIndexService {
|
||||
|
||||
/**
|
||||
* 获取es生成索引进度
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
Map<String, Integer> getProgress();
|
||||
|
||||
/**
|
||||
* 添加商品索引
|
||||
*
|
||||
@ -99,14 +102,13 @@ public interface EsGoodsIndexService {
|
||||
* @param id id(skuId)
|
||||
* @param promotion 促销信息
|
||||
* @param key 促销信息的key
|
||||
* @param price 促销价格
|
||||
*/
|
||||
void updateEsGoodsIndexPromotions(String id, BasePromotions promotion, String key, Double price);
|
||||
UpdateRequest updateEsGoodsIndexPromotions(String id, BasePromotions promotion, String key);
|
||||
|
||||
/**
|
||||
* 更新商品索引的促销信息
|
||||
*
|
||||
* @param ids id(skuId)
|
||||
* @param ids id(skuId)
|
||||
* @param promotion 促销信息
|
||||
* @param key 促销信息的key
|
||||
*/
|
||||
@ -140,7 +142,7 @@ public interface EsGoodsIndexService {
|
||||
/**
|
||||
* 删除索引中指定的促销活动id的促销活动
|
||||
*
|
||||
* @param skuIds 商品skuId
|
||||
* @param skuIds 商品skuId
|
||||
* @param promotionId 促销活动Id
|
||||
*/
|
||||
void deleteEsGoodsPromotionByPromotionId(List<String> skuIds, String promotionId);
|
||||
|
@ -23,9 +23,7 @@ import cn.lili.modules.goods.entity.enums.GoodsStatusEnum;
|
||||
import cn.lili.modules.goods.entity.enums.GoodsWordsTypeEnum;
|
||||
import cn.lili.modules.goods.service.*;
|
||||
import cn.lili.modules.promotion.entity.dos.BasePromotions;
|
||||
import cn.lili.modules.promotion.entity.dos.Pintuan;
|
||||
import cn.lili.modules.promotion.entity.dos.PromotionGoods;
|
||||
import cn.lili.modules.promotion.entity.dos.Seckill;
|
||||
import cn.lili.modules.promotion.entity.enums.PromotionsStatusEnum;
|
||||
import cn.lili.modules.promotion.service.PromotionService;
|
||||
import cn.lili.modules.promotion.tools.PromotionTools;
|
||||
@ -39,6 +37,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.assertj.core.util.IterableUtil;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.client.RequestOptions;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
@ -355,20 +354,14 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateEsGoodsIndexPromotions(String id, BasePromotions promotion, String key, Double price) {
|
||||
public UpdateRequest updateEsGoodsIndexPromotions(String id, BasePromotions promotion, String key) {
|
||||
EsGoodsIndex goodsIndex = findById(id);
|
||||
if (goodsIndex != null) {
|
||||
//如果有促销活动开始,则将促销金额写入
|
||||
if (price != null) {
|
||||
goodsIndex.setPromotionPrice(price);
|
||||
} else {
|
||||
//否则促销金额为商品原价
|
||||
goodsIndex.setPromotionPrice(goodsIndex.getPrice());
|
||||
}
|
||||
//更新索引
|
||||
this.updateGoodsIndexPromotion(goodsIndex, key, promotion);
|
||||
return this.updateGoodsIndexPromotion(goodsIndex, key, promotion);
|
||||
} else {
|
||||
log.error("更新索引商品促销信息失败!skuId 为 {} 的索引不存在!", id);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@ -381,24 +374,40 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements
|
||||
*/
|
||||
@Override
|
||||
public void updateEsGoodsIndexPromotions(List<String> ids, BasePromotions promotion, String key) {
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
log.info("修改商品活动索引");
|
||||
log.info("商品ids: {}", ids);
|
||||
log.info("活动关键字: {}", key);
|
||||
log.info("活动: {}", promotion);
|
||||
for (String id : ids) {
|
||||
this.updateEsGoodsIndexPromotions(id, promotion, key, null);
|
||||
UpdateRequest updateRequest = this.updateEsGoodsIndexPromotions(id, promotion, key);
|
||||
if (updateRequest != null) {
|
||||
bulkRequest.add(updateRequest);
|
||||
}
|
||||
}
|
||||
this.executeBulkUpdateRequest(bulkRequest);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void updateEsGoodsIndexByList(List<PromotionGoods> promotionGoodsList, BasePromotions promotion, String key) {
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
log.info("修改商品活动索引");
|
||||
log.info("促销商品信息: {}", promotionGoodsList);
|
||||
log.info("活动关键字: {}", key);
|
||||
log.info("活动: {}", promotion);
|
||||
if (promotionGoodsList != null) {
|
||||
//循环更新 促销商品索引
|
||||
for (PromotionGoods promotionGoods : promotionGoodsList) {
|
||||
Double price = null;
|
||||
if (promotion instanceof Seckill || promotion instanceof Pintuan) {
|
||||
price = promotionGoods.getPrice();
|
||||
promotion.setStartTime(promotionGoods.getStartTime());
|
||||
promotion.setEndTime(promotionGoods.getEndTime());
|
||||
UpdateRequest updateRequest = this.updateEsGoodsIndexPromotions(promotionGoods.getSkuId(), promotion, key);
|
||||
if (updateRequest != null) {
|
||||
bulkRequest.add(updateRequest);
|
||||
}
|
||||
this.updateEsGoodsIndexPromotions(promotionGoods.getSkuId(), promotion, key, price);
|
||||
}
|
||||
}
|
||||
|
||||
this.executeBulkUpdateRequest(bulkRequest);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -422,15 +431,12 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements
|
||||
} else {
|
||||
//否则是平台活动
|
||||
Iterable<EsGoodsIndex> all = goodsIndexRepository.findAll();
|
||||
// 查询出全部商品
|
||||
//查询出全部商品
|
||||
goodsIndices = new ArrayList<>(IterableUtil.toCollection(all));
|
||||
}
|
||||
List<String> skuIds = goodsIndices.stream().map(EsGoodsIndex::getId).collect(Collectors.toList());
|
||||
this.deleteEsGoodsPromotionByPromotionId(skuIds, promotion.getId());
|
||||
//更新商品索引
|
||||
for (EsGoodsIndex goodsIndex : goodsIndices) {
|
||||
this.updateGoodsIndexPromotion(goodsIndex, key, promotion);
|
||||
}
|
||||
this.updateEsGoodsIndexPromotions(skuIds, promotion, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -456,21 +462,32 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements
|
||||
|
||||
@Override
|
||||
public void deleteEsGoodsPromotionByPromotionId(List<String> skuIds, String promotionId) {
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
log.info("删除商品活动索引");
|
||||
log.info("商品skuIds: {}", skuIds);
|
||||
log.info("活动Id: {}", promotionId);
|
||||
if (skuIds != null && !skuIds.isEmpty()) {
|
||||
for (String skuId : skuIds) {
|
||||
EsGoodsIndex goodsIndex = findById(skuId);
|
||||
//商品索引不为空
|
||||
if (goodsIndex != null) {
|
||||
this.removePromotionByPromotionId(goodsIndex, promotionId);
|
||||
UpdateRequest updateRequest = this.removePromotionByPromotionId(goodsIndex, promotionId);
|
||||
if (updateRequest != null) {
|
||||
bulkRequest.add(updateRequest);
|
||||
}
|
||||
} else {
|
||||
log.error("更新索引商品促销信息失败!skuId 为 【{}】的索引不存在!", skuId);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (EsGoodsIndex goodsIndex : this.goodsIndexRepository.findAll()) {
|
||||
this.removePromotionByPromotionId(goodsIndex, promotionId);
|
||||
UpdateRequest updateRequest = this.removePromotionByPromotionId(goodsIndex, promotionId);
|
||||
if (updateRequest != null) {
|
||||
bulkRequest.add(updateRequest);
|
||||
}
|
||||
}
|
||||
}
|
||||
this.executeBulkUpdateRequest(bulkRequest);
|
||||
|
||||
}
|
||||
|
||||
@ -480,15 +497,16 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements
|
||||
* @param goodsIndex 索引
|
||||
* @param promotionId 促销活动id
|
||||
*/
|
||||
private void removePromotionByPromotionId(EsGoodsIndex goodsIndex, String promotionId) {
|
||||
private UpdateRequest removePromotionByPromotionId(EsGoodsIndex goodsIndex, String promotionId) {
|
||||
Map<String, Object> promotionMap = goodsIndex.getPromotionMap();
|
||||
if (promotionMap != null && !promotionMap.isEmpty()) {
|
||||
//如果存在同促销ID的活动删除
|
||||
List<String> collect = promotionMap.keySet().stream().filter(i -> i.split("-")[1].equals(promotionId)).collect(Collectors.toList());
|
||||
collect.forEach(promotionMap::remove);
|
||||
goodsIndex.setPromotionMap(promotionMap);
|
||||
this.updatePromotionsByScript(goodsIndex.getId(), promotionMap);
|
||||
return this.getGoodsIndexPromotionUpdateRequest(goodsIndex.getId(), promotionMap);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -594,11 +612,7 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements
|
||||
* @param key 关键字
|
||||
* @param promotion 活动
|
||||
*/
|
||||
private void updateGoodsIndexPromotion(EsGoodsIndex goodsIndex, String key, BasePromotions promotion) {
|
||||
log.info("修改商品活动索引");
|
||||
log.info("商品索引: {}", goodsIndex);
|
||||
log.info("关键字: {}", key);
|
||||
log.info("活动: {}", promotion);
|
||||
private UpdateRequest updateGoodsIndexPromotion(EsGoodsIndex goodsIndex, String key, BasePromotions promotion) {
|
||||
Map<String, Object> promotionMap;
|
||||
//数据非空处理,如果空给一个新的信息
|
||||
if (goodsIndex.getPromotionMap() == null || goodsIndex.getPromotionMap().isEmpty()) {
|
||||
@ -613,7 +627,7 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements
|
||||
} else {
|
||||
promotionMap.put(key, promotion);
|
||||
}
|
||||
this.updatePromotionsByScript(goodsIndex.getId(), promotionMap);
|
||||
return this.getGoodsIndexPromotionUpdateRequest(goodsIndex.getId(), promotionMap);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -622,7 +636,7 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements
|
||||
* @param id 索引id
|
||||
* @param promotionMap 促销信息
|
||||
*/
|
||||
private void updatePromotionsByScript(String id, Map<String, Object> promotionMap) {
|
||||
private UpdateRequest getGoodsIndexPromotionUpdateRequest(String id, Map<String, Object> promotionMap) {
|
||||
JSONObject jsonObject = JSONUtil.parseObj(promotionMap);
|
||||
jsonObject.setDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
String s = jsonObject.toString();
|
||||
@ -633,10 +647,27 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements
|
||||
updateRequest.id(id);
|
||||
updateRequest.retryOnConflict(5);
|
||||
updateRequest.script(new Script("ctx._source." + "promotionMap" + "=" + promotionsStr + ";"));
|
||||
return updateRequest;
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行批量更新商品索引
|
||||
*
|
||||
* @param bulkRequest 批量请求
|
||||
*/
|
||||
private void executeBulkUpdateRequest(BulkRequest bulkRequest) {
|
||||
if (bulkRequest.requests().isEmpty()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
client.update(updateRequest, RequestOptions.DEFAULT);
|
||||
BulkResponse responses = this.client.bulk(bulkRequest, RequestOptions.DEFAULT);
|
||||
if (responses.hasFailures()) {
|
||||
log.info("批量更新商品索引的促销信息中出现部分异常:{}", responses.buildFailureMessage());
|
||||
} else {
|
||||
log.info("批量更新商品索引的促销信息结果:{}", responses.status());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
log.error("更新商品索引促销信息错误", e);
|
||||
log.error("批量更新商品索引的促销信息出现异常!", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user