fix: 优化elasticsearch 配置,解决I/O reactor问题。

This commit is contained in:
misworga831 2023-07-28 16:16:16 +08:00
parent 9f17dcafbb
commit e8af34995b
3 changed files with 109 additions and 65 deletions

View File

@ -389,11 +389,17 @@ public abstract class BaseElasticsearchService {
*/ */
protected void deleteIndexRequest(String index) { protected void deleteIndexRequest(String index) {
DeleteIndexRequest deleteIndexRequest = buildDeleteIndexRequest(index); DeleteIndexRequest deleteIndexRequest = buildDeleteIndexRequest(index);
try { client.indices().deleteAsync(deleteIndexRequest, COMMON_OPTIONS, new ActionListener<AcknowledgedResponse>() {
client.indices().delete(deleteIndexRequest, COMMON_OPTIONS); @Override
} catch (IOException e) { public void onResponse(AcknowledgedResponse acknowledgedResponse) {
throw new ElasticsearchException("删除索引 {" + index + "} 失败:" + e.getMessage()); log.info("删除索引 {} 成功", index);
} }
@Override
public void onFailure(Exception e) {
log.error("删除索引 {} 失败", index, e);
}
});
} }
/** /**

View File

@ -8,7 +8,7 @@ import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider; import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ConnectionKeepAliveStrategy; import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.reactor.IOReactorConfig; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RestHighLevelClient;
@ -39,35 +39,46 @@ public class ElasticsearchConfig extends AbstractElasticsearchConfiguration {
@Override @Override
@Bean @Bean
public RestHighLevelClient elasticsearchClient() { public RestHighLevelClient elasticsearchClient() {
RestClientBuilder restBuilder = RestClient
.builder(this.getHttpHosts());
restBuilder.setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder
.setKeepAliveStrategy(getConnectionKeepAliveStrategy())
.setMaxConnPerRoute(10).
setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build()));
String username = elasticsearchProperties.getAccount().getUsername(); String username = elasticsearchProperties.getAccount().getUsername();
String password = elasticsearchProperties.getAccount().getPassword(); String password = elasticsearchProperties.getAccount().getPassword();
if (username != null && password != null) { final CredentialsProvider credential = createCredentialsIfNotNull(username, password);
final CredentialsProvider credential = new BasicCredentialsProvider();
credential.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
restBuilder.setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder
.setDefaultCredentialsProvider(credential)
.setKeepAliveStrategy(getConnectionKeepAliveStrategy())
.setMaxConnPerRoute(10)
.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(Runtime.getRuntime().availableProcessors()).build()));
}
restBuilder.setRequestConfigCallback(requestConfigBuilder -> RestClientBuilder restBuilder = createRestClientBuilderWithConfig(credential);
requestConfigBuilder.setConnectTimeout(1000) //time until a connection with the server is established.
.setSocketTimeout(12 * 1000) //time of inactivity to wait for packets[data] to receive.
.setConnectionRequestTimeout(-1)); //time to fetch a connection from the connection pool 0 for infinite.
client = new RestHighLevelClient(restBuilder); client = new RestHighLevelClient(restBuilder);
return client; return client;
} }
private CredentialsProvider createCredentialsIfNotNull(String username, String password) {
if (username == null || password == null) {
return null;
}
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
return credentialsProvider;
}
private RestClientBuilder createRestClientBuilderWithConfig(CredentialsProvider credentialsProvider) {
return RestClient
.builder(this.getHttpHosts())
.setHttpClientConfigCallback(httpClientBuilder -> configureHttpClientBuilder(httpClientBuilder, credentialsProvider))
.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder
.setConnectTimeout(1000)
.setSocketTimeout(12 * 1000));
}
private HttpAsyncClientBuilder configureHttpClientBuilder(HttpAsyncClientBuilder httpClientBuilder,
CredentialsProvider credentialsProvider) {
httpClientBuilder
.setKeepAliveStrategy(getConnectionKeepAliveStrategy())
.setMaxConnPerRoute(10);
if (credentialsProvider != null) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
return httpClientBuilder;
}
private HttpHost[] getHttpHosts() { private HttpHost[] getHttpHosts() {
List<String> clusterNodes = elasticsearchProperties.getClusterNodes(); List<String> clusterNodes = elasticsearchProperties.getClusterNodes();
HttpHost[] httpHosts = new HttpHost[clusterNodes.size()]; HttpHost[] httpHosts = new HttpHost[clusterNodes.size()];

View File

@ -50,6 +50,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
@ -71,7 +72,6 @@ import org.springframework.data.elasticsearch.core.SearchPage;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.io.IOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -416,14 +416,20 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements
} }
update.setScript(new Script(script.toString())); update.setScript(new Script(script.toString()));
update.setConflicts("proceed"); update.setConflicts("proceed");
try {
BulkByScrollResponse bulkByScrollResponse = client.updateByQuery(update, RequestOptions.DEFAULT); this.client.updateByQueryAsync(update, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
if (bulkByScrollResponse.getVersionConflicts() > 0) { if (bulkByScrollResponse.getVersionConflicts() > 0) {
throw new RetryException("更新商品索引失败es内容版本冲突"); throw new RetryException("更新商品索引失败es内容版本冲突");
} }
} catch (IOException e) { }
@Override
public void onFailure(Exception e) {
log.error("更新商品索引异常", e); log.error("更新商品索引异常", e);
} }
});
} }
/** /**
@ -433,7 +439,6 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements
*/ */
@Override @Override
public void updateBulkIndex(List<EsGoodsIndex> goodsIndices) { public void updateBulkIndex(List<EsGoodsIndex> goodsIndices) {
try {
//索引名称拼接 //索引名称拼接
String indexName = getIndexName(); String indexName = getIndexName();
@ -447,10 +452,19 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements
updateRequest.doc(jsonObject); updateRequest.doc(jsonObject);
request.add(updateRequest); request.add(updateRequest);
} }
client.bulk(request, RequestOptions.DEFAULT); this.client.bulkAsync(request, RequestOptions.DEFAULT, new ActionListener<BulkResponse>() {
} catch (IOException e) { @Override
public void onResponse(BulkResponse bulkItemResponses) {
if (bulkItemResponses.hasFailures()) {
throw new RetryException("批量更新商品索引失败es内容版本冲突");
}
}
@Override
public void onFailure(Exception e) {
log.error("批量更新商品索引异常", e); log.error("批量更新商品索引异常", e);
} }
});
} }
/** /**
@ -469,14 +483,21 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements
deleteByQueryRequest.setQuery(boolQueryBuilder); deleteByQueryRequest.setQuery(boolQueryBuilder);
deleteByQueryRequest.indices(getIndexName()); deleteByQueryRequest.indices(getIndexName());
deleteByQueryRequest.setConflicts("proceed"); deleteByQueryRequest.setConflicts("proceed");
try { this.client.deleteByQueryAsync(deleteByQueryRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
if (bulkByScrollResponse.getVersionConflicts() > 0) { if (bulkByScrollResponse.getVersionConflicts() > 0) {
throw new RetryException("删除索引失败es内容版本冲突"); throw new RetryException("删除索引失败es内容版本冲突");
} }
} catch (IOException e) {
log.error("删除索引异常", e);
} }
@Override
public void onFailure(Exception e) {
throw new RetryException("删除索引失败," + e.getMessage());
}
});
} }
/** /**
@ -883,16 +904,22 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements
if (bulkRequest.requests().isEmpty()) { if (bulkRequest.requests().isEmpty()) {
return; return;
} }
try { this.client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, new ActionListener<BulkResponse>() {
BulkResponse responses = this.client.bulk(bulkRequest, RequestOptions.DEFAULT); @Override
if (responses.hasFailures()) { public void onResponse(BulkResponse bulkItemResponses) {
log.info("批量更新商品索引的促销信息中出现部分异常:{}", responses.buildFailureMessage()); if (bulkItemResponses.hasFailures()) {
log.info("批量更新商品索引的促销信息中出现部分异常:{}", bulkItemResponses.buildFailureMessage());
} else { } else {
log.info("批量更新商品索引的促销信息结果:{}", responses.status()); log.info("批量更新商品索引的促销信息结果:{}", bulkItemResponses.status());
} }
} catch (IOException e) { }
@Override
public void onFailure(Exception e) {
log.error("批量更新商品索引的促销信息出现异常!", e); log.error("批量更新商品索引的促销信息出现异常!", e);
} }
});
} }
/** /**