Merge branch 'master' of gitee.com:beijing_hongye_huicheng/lilishop
This commit is contained in:
commit
80c8b4b4d4
@ -389,11 +389,17 @@ public abstract class BaseElasticsearchService {
|
||||
*/
|
||||
protected void deleteIndexRequest(String index) {
|
||||
DeleteIndexRequest deleteIndexRequest = buildDeleteIndexRequest(index);
|
||||
try {
|
||||
client.indices().delete(deleteIndexRequest, COMMON_OPTIONS);
|
||||
} catch (IOException e) {
|
||||
throw new ElasticsearchException("删除索引 {" + index + "} 失败:" + e.getMessage());
|
||||
}
|
||||
client.indices().deleteAsync(deleteIndexRequest, COMMON_OPTIONS, new ActionListener<AcknowledgedResponse>() {
|
||||
@Override
|
||||
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
|
||||
log.info("删除索引 {} 成功", index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
log.error("删除索引 {} 失败", index, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -8,7 +8,7 @@ import org.apache.http.auth.UsernamePasswordCredentials;
|
||||
import org.apache.http.client.CredentialsProvider;
|
||||
import org.apache.http.conn.ConnectionKeepAliveStrategy;
|
||||
import org.apache.http.impl.client.BasicCredentialsProvider;
|
||||
import org.apache.http.impl.nio.reactor.IOReactorConfig;
|
||||
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.client.RestClientBuilder;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
@ -39,35 +39,46 @@ public class ElasticsearchConfig extends AbstractElasticsearchConfiguration {
|
||||
@Override
|
||||
@Bean
|
||||
public RestHighLevelClient elasticsearchClient() {
|
||||
RestClientBuilder restBuilder = RestClient
|
||||
.builder(this.getHttpHosts());
|
||||
restBuilder.setHttpClientConfigCallback(httpClientBuilder ->
|
||||
httpClientBuilder
|
||||
.setKeepAliveStrategy(getConnectionKeepAliveStrategy())
|
||||
.setMaxConnPerRoute(10).
|
||||
setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build()));
|
||||
String username = elasticsearchProperties.getAccount().getUsername();
|
||||
String password = elasticsearchProperties.getAccount().getPassword();
|
||||
if (username != null && password != null) {
|
||||
final CredentialsProvider credential = new BasicCredentialsProvider();
|
||||
credential.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
|
||||
restBuilder.setHttpClientConfigCallback(httpClientBuilder ->
|
||||
httpClientBuilder
|
||||
.setDefaultCredentialsProvider(credential)
|
||||
.setKeepAliveStrategy(getConnectionKeepAliveStrategy())
|
||||
.setMaxConnPerRoute(10)
|
||||
.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(Runtime.getRuntime().availableProcessors()).build()));
|
||||
}
|
||||
final CredentialsProvider credential = createCredentialsIfNotNull(username, password);
|
||||
|
||||
restBuilder.setRequestConfigCallback(requestConfigBuilder ->
|
||||
requestConfigBuilder.setConnectTimeout(1000) //time until a connection with the server is established.
|
||||
.setSocketTimeout(12 * 1000) //time of inactivity to wait for packets[data] to receive.
|
||||
.setConnectionRequestTimeout(-1)); //time to fetch a connection from the connection pool 0 for infinite.
|
||||
RestClientBuilder restBuilder = createRestClientBuilderWithConfig(credential);
|
||||
|
||||
client = new RestHighLevelClient(restBuilder);
|
||||
return client;
|
||||
}
|
||||
|
||||
private CredentialsProvider createCredentialsIfNotNull(String username, String password) {
|
||||
if (username == null || password == null) {
|
||||
return null;
|
||||
}
|
||||
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
|
||||
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
|
||||
return credentialsProvider;
|
||||
}
|
||||
|
||||
private RestClientBuilder createRestClientBuilderWithConfig(CredentialsProvider credentialsProvider) {
|
||||
return RestClient
|
||||
.builder(this.getHttpHosts())
|
||||
.setHttpClientConfigCallback(httpClientBuilder -> configureHttpClientBuilder(httpClientBuilder, credentialsProvider))
|
||||
.setRequestConfigCallback(requestConfigBuilder ->
|
||||
requestConfigBuilder
|
||||
.setConnectTimeout(1000)
|
||||
.setSocketTimeout(12 * 1000));
|
||||
}
|
||||
|
||||
private HttpAsyncClientBuilder configureHttpClientBuilder(HttpAsyncClientBuilder httpClientBuilder,
|
||||
CredentialsProvider credentialsProvider) {
|
||||
httpClientBuilder
|
||||
.setKeepAliveStrategy(getConnectionKeepAliveStrategy())
|
||||
.setMaxConnPerRoute(10);
|
||||
if (credentialsProvider != null) {
|
||||
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
|
||||
}
|
||||
return httpClientBuilder;
|
||||
}
|
||||
|
||||
private HttpHost[] getHttpHosts() {
|
||||
List<String> clusterNodes = elasticsearchProperties.getClusterNodes();
|
||||
HttpHost[] httpHosts = new HttpHost[clusterNodes.size()];
|
||||
|
@ -65,7 +65,7 @@ public class TencentFilePlugin implements FilePlugin {
|
||||
* @return
|
||||
*/
|
||||
private String getUrlPrefix() {
|
||||
return "https://" + ossSetting.getTencentCOSBucket() + ".cos" + ossSetting.getTencentCOSEndPoint() + ".myqcloud.com/";
|
||||
return "https://" + ossSetting.getTencentCOSBucket() + ".cos" + ossSetting.getTencentCOSRegion() + ".myqcloud.com/";
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -50,6 +50,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
@ -71,7 +72,6 @@ import org.springframework.data.elasticsearch.core.SearchPage;
|
||||
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -416,14 +416,20 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements
|
||||
}
|
||||
update.setScript(new Script(script.toString()));
|
||||
update.setConflicts("proceed");
|
||||
try {
|
||||
BulkByScrollResponse bulkByScrollResponse = client.updateByQuery(update, RequestOptions.DEFAULT);
|
||||
if (bulkByScrollResponse.getVersionConflicts() > 0) {
|
||||
throw new RetryException("更新商品索引失败,es内容版本冲突");
|
||||
|
||||
this.client.updateByQueryAsync(update, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
|
||||
@Override
|
||||
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
|
||||
if (bulkByScrollResponse.getVersionConflicts() > 0) {
|
||||
throw new RetryException("更新商品索引失败,es内容版本冲突");
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
log.error("更新商品索引异常", e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
log.error("更新商品索引异常", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@ -433,24 +439,32 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements
|
||||
*/
|
||||
@Override
|
||||
public void updateBulkIndex(List<EsGoodsIndex> goodsIndices) {
|
||||
try {
|
||||
//索引名称拼接
|
||||
String indexName = getIndexName();
|
||||
//索引名称拼接
|
||||
String indexName = getIndexName();
|
||||
|
||||
BulkRequest request = new BulkRequest();
|
||||
BulkRequest request = new BulkRequest();
|
||||
|
||||
for (EsGoodsIndex goodsIndex : goodsIndices) {
|
||||
UpdateRequest updateRequest = new UpdateRequest(indexName, goodsIndex.getId());
|
||||
for (EsGoodsIndex goodsIndex : goodsIndices) {
|
||||
UpdateRequest updateRequest = new UpdateRequest(indexName, goodsIndex.getId());
|
||||
|
||||
JSONObject jsonObject = JSONUtil.parseObj(goodsIndex);
|
||||
jsonObject.set("releaseTime", goodsIndex.getReleaseTime());
|
||||
updateRequest.doc(jsonObject);
|
||||
request.add(updateRequest);
|
||||
}
|
||||
client.bulk(request, RequestOptions.DEFAULT);
|
||||
} catch (IOException e) {
|
||||
log.error("批量更新商品索引异常", e);
|
||||
JSONObject jsonObject = JSONUtil.parseObj(goodsIndex);
|
||||
jsonObject.set("releaseTime", goodsIndex.getReleaseTime());
|
||||
updateRequest.doc(jsonObject);
|
||||
request.add(updateRequest);
|
||||
}
|
||||
this.client.bulkAsync(request, RequestOptions.DEFAULT, new ActionListener<BulkResponse>() {
|
||||
@Override
|
||||
public void onResponse(BulkResponse bulkItemResponses) {
|
||||
if (bulkItemResponses.hasFailures()) {
|
||||
throw new RetryException("批量更新商品索引失败,es内容版本冲突");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
log.error("批量更新商品索引异常", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@ -469,14 +483,21 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements
|
||||
deleteByQueryRequest.setQuery(boolQueryBuilder);
|
||||
deleteByQueryRequest.indices(getIndexName());
|
||||
deleteByQueryRequest.setConflicts("proceed");
|
||||
try {
|
||||
BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
|
||||
if (bulkByScrollResponse.getVersionConflicts() > 0) {
|
||||
throw new RetryException("删除索引失败,es内容版本冲突");
|
||||
this.client.deleteByQueryAsync(deleteByQueryRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
|
||||
|
||||
@Override
|
||||
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
|
||||
if (bulkByScrollResponse.getVersionConflicts() > 0) {
|
||||
throw new RetryException("删除索引失败,es内容版本冲突");
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
log.error("删除索引异常", e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new RetryException("删除索引失败," + e.getMessage());
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
@ -883,16 +904,22 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements
|
||||
if (bulkRequest.requests().isEmpty()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
BulkResponse responses = this.client.bulk(bulkRequest, RequestOptions.DEFAULT);
|
||||
if (responses.hasFailures()) {
|
||||
log.info("批量更新商品索引的促销信息中出现部分异常:{}", responses.buildFailureMessage());
|
||||
} else {
|
||||
log.info("批量更新商品索引的促销信息结果:{}", responses.status());
|
||||
this.client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, new ActionListener<BulkResponse>() {
|
||||
@Override
|
||||
public void onResponse(BulkResponse bulkItemResponses) {
|
||||
if (bulkItemResponses.hasFailures()) {
|
||||
log.info("批量更新商品索引的促销信息中出现部分异常:{}", bulkItemResponses.buildFailureMessage());
|
||||
} else {
|
||||
log.info("批量更新商品索引的促销信息结果:{}", bulkItemResponses.status());
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
log.error("批量更新商品索引的促销信息出现异常!", e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
log.error("批量更新商品索引的促销信息出现异常!", e);
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user