diff --git a/framework/src/main/java/cn/lili/elasticsearch/BaseElasticsearchService.java b/framework/src/main/java/cn/lili/elasticsearch/BaseElasticsearchService.java index 7f08ddf7..e5976870 100644 --- a/framework/src/main/java/cn/lili/elasticsearch/BaseElasticsearchService.java +++ b/framework/src/main/java/cn/lili/elasticsearch/BaseElasticsearchService.java @@ -389,11 +389,17 @@ public abstract class BaseElasticsearchService { */ protected void deleteIndexRequest(String index) { DeleteIndexRequest deleteIndexRequest = buildDeleteIndexRequest(index); - try { - client.indices().delete(deleteIndexRequest, COMMON_OPTIONS); - } catch (IOException e) { - throw new ElasticsearchException("删除索引 {" + index + "} 失败:" + e.getMessage()); - } + client.indices().deleteAsync(deleteIndexRequest, COMMON_OPTIONS, new ActionListener() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + log.info("删除索引 {} 成功", index); + } + + @Override + public void onFailure(Exception e) { + log.error("删除索引 {} 失败", index, e); + } + }); } /** diff --git a/framework/src/main/java/cn/lili/elasticsearch/config/ElasticsearchConfig.java b/framework/src/main/java/cn/lili/elasticsearch/config/ElasticsearchConfig.java index b80da178..6010a2a4 100644 --- a/framework/src/main/java/cn/lili/elasticsearch/config/ElasticsearchConfig.java +++ b/framework/src/main/java/cn/lili/elasticsearch/config/ElasticsearchConfig.java @@ -8,7 +8,7 @@ import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.conn.ConnectionKeepAliveStrategy; import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.http.impl.nio.reactor.IOReactorConfig; +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; @@ -39,35 +39,46 @@ public class ElasticsearchConfig extends AbstractElasticsearchConfiguration { @Override @Bean public RestHighLevelClient elasticsearchClient() { - RestClientBuilder restBuilder = RestClient - .builder(this.getHttpHosts()); - restBuilder.setHttpClientConfigCallback(httpClientBuilder -> - httpClientBuilder - .setKeepAliveStrategy(getConnectionKeepAliveStrategy()) - .setMaxConnPerRoute(10). - setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build())); String username = elasticsearchProperties.getAccount().getUsername(); String password = elasticsearchProperties.getAccount().getPassword(); - if (username != null && password != null) { - final CredentialsProvider credential = new BasicCredentialsProvider(); - credential.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); - restBuilder.setHttpClientConfigCallback(httpClientBuilder -> - httpClientBuilder - .setDefaultCredentialsProvider(credential) - .setKeepAliveStrategy(getConnectionKeepAliveStrategy()) - .setMaxConnPerRoute(10) - .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(Runtime.getRuntime().availableProcessors()).build())); - } + final CredentialsProvider credential = createCredentialsIfNotNull(username, password); - restBuilder.setRequestConfigCallback(requestConfigBuilder -> - requestConfigBuilder.setConnectTimeout(1000) //time until a connection with the server is established. - .setSocketTimeout(12 * 1000) //time of inactivity to wait for packets[data] to receive. - .setConnectionRequestTimeout(-1)); //time to fetch a connection from the connection pool 0 for infinite. + RestClientBuilder restBuilder = createRestClientBuilderWithConfig(credential); client = new RestHighLevelClient(restBuilder); return client; } + private CredentialsProvider createCredentialsIfNotNull(String username, String password) { + if (username == null || password == null) { + return null; + } + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); + return credentialsProvider; + } + + private RestClientBuilder createRestClientBuilderWithConfig(CredentialsProvider credentialsProvider) { + return RestClient + .builder(this.getHttpHosts()) + .setHttpClientConfigCallback(httpClientBuilder -> configureHttpClientBuilder(httpClientBuilder, credentialsProvider)) + .setRequestConfigCallback(requestConfigBuilder -> + requestConfigBuilder + .setConnectTimeout(1000) + .setSocketTimeout(12 * 1000)); + } + + private HttpAsyncClientBuilder configureHttpClientBuilder(HttpAsyncClientBuilder httpClientBuilder, + CredentialsProvider credentialsProvider) { + httpClientBuilder + .setKeepAliveStrategy(getConnectionKeepAliveStrategy()) + .setMaxConnPerRoute(10); + if (credentialsProvider != null) { + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + } + return httpClientBuilder; + } + private HttpHost[] getHttpHosts() { List clusterNodes = elasticsearchProperties.getClusterNodes(); HttpHost[] httpHosts = new HttpHost[clusterNodes.size()]; diff --git a/framework/src/main/java/cn/lili/modules/file/plugin/impl/TencentFilePlugin.java b/framework/src/main/java/cn/lili/modules/file/plugin/impl/TencentFilePlugin.java index 0ccd9145..0c5b0599 100644 --- a/framework/src/main/java/cn/lili/modules/file/plugin/impl/TencentFilePlugin.java +++ b/framework/src/main/java/cn/lili/modules/file/plugin/impl/TencentFilePlugin.java @@ -65,7 +65,7 @@ public class TencentFilePlugin implements FilePlugin { * @return */ private String getUrlPrefix() { - return "https://" + ossSetting.getTencentCOSBucket() + ".cos" + ossSetting.getTencentCOSEndPoint() + ".myqcloud.com/"; + return "https://" + ossSetting.getTencentCOSBucket() + ".cos" + ossSetting.getTencentCOSRegion() + ".myqcloud.com/"; } @Override diff --git a/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsIndexServiceImpl.java b/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsIndexServiceImpl.java index e2c34666..312f9441 100644 --- a/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsIndexServiceImpl.java +++ b/framework/src/main/java/cn/lili/modules/search/serviceimpl/EsGoodsIndexServiceImpl.java @@ -50,6 +50,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.update.UpdateRequest; @@ -71,7 +72,6 @@ import org.springframework.data.elasticsearch.core.SearchPage; import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; import org.springframework.stereotype.Service; -import java.io.IOException; import java.lang.reflect.Field; import java.util.*; import java.util.concurrent.TimeUnit; @@ -416,14 +416,20 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements } update.setScript(new Script(script.toString())); update.setConflicts("proceed"); - try { - BulkByScrollResponse bulkByScrollResponse = client.updateByQuery(update, RequestOptions.DEFAULT); - if (bulkByScrollResponse.getVersionConflicts() > 0) { - throw new RetryException("更新商品索引失败,es内容版本冲突"); + + this.client.updateByQueryAsync(update, RequestOptions.DEFAULT, new ActionListener() { + @Override + public void onResponse(BulkByScrollResponse bulkByScrollResponse) { + if (bulkByScrollResponse.getVersionConflicts() > 0) { + throw new RetryException("更新商品索引失败,es内容版本冲突"); + } } - } catch (IOException e) { - log.error("更新商品索引异常", e); - } + + @Override + public void onFailure(Exception e) { + log.error("更新商品索引异常", e); + } + }); } /** @@ -433,24 +439,32 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements */ @Override public void updateBulkIndex(List goodsIndices) { - try { - //索引名称拼接 - String indexName = getIndexName(); + //索引名称拼接 + String indexName = getIndexName(); - BulkRequest request = new BulkRequest(); + BulkRequest request = new BulkRequest(); - for (EsGoodsIndex goodsIndex : goodsIndices) { - UpdateRequest updateRequest = new UpdateRequest(indexName, goodsIndex.getId()); + for (EsGoodsIndex goodsIndex : goodsIndices) { + UpdateRequest updateRequest = new UpdateRequest(indexName, goodsIndex.getId()); - JSONObject jsonObject = JSONUtil.parseObj(goodsIndex); - jsonObject.set("releaseTime", goodsIndex.getReleaseTime()); - updateRequest.doc(jsonObject); - request.add(updateRequest); - } - client.bulk(request, RequestOptions.DEFAULT); - } catch (IOException e) { - log.error("批量更新商品索引异常", e); + JSONObject jsonObject = JSONUtil.parseObj(goodsIndex); + jsonObject.set("releaseTime", goodsIndex.getReleaseTime()); + updateRequest.doc(jsonObject); + request.add(updateRequest); } + this.client.bulkAsync(request, RequestOptions.DEFAULT, new ActionListener() { + @Override + public void onResponse(BulkResponse bulkItemResponses) { + if (bulkItemResponses.hasFailures()) { + throw new RetryException("批量更新商品索引失败,es内容版本冲突"); + } + } + + @Override + public void onFailure(Exception e) { + log.error("批量更新商品索引异常", e); + } + }); } /** @@ -469,14 +483,21 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements deleteByQueryRequest.setQuery(boolQueryBuilder); deleteByQueryRequest.indices(getIndexName()); deleteByQueryRequest.setConflicts("proceed"); - try { - BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT); - if (bulkByScrollResponse.getVersionConflicts() > 0) { - throw new RetryException("删除索引失败,es内容版本冲突"); + this.client.deleteByQueryAsync(deleteByQueryRequest, RequestOptions.DEFAULT, new ActionListener() { + + @Override + public void onResponse(BulkByScrollResponse bulkByScrollResponse) { + if (bulkByScrollResponse.getVersionConflicts() > 0) { + throw new RetryException("删除索引失败,es内容版本冲突"); + } } - } catch (IOException e) { - log.error("删除索引异常", e); - } + + @Override + public void onFailure(Exception e) { + throw new RetryException("删除索引失败," + e.getMessage()); + } + }); + } /** @@ -883,16 +904,22 @@ public class EsGoodsIndexServiceImpl extends BaseElasticsearchService implements if (bulkRequest.requests().isEmpty()) { return; } - try { - BulkResponse responses = this.client.bulk(bulkRequest, RequestOptions.DEFAULT); - if (responses.hasFailures()) { - log.info("批量更新商品索引的促销信息中出现部分异常:{}", responses.buildFailureMessage()); - } else { - log.info("批量更新商品索引的促销信息结果:{}", responses.status()); + this.client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, new ActionListener() { + @Override + public void onResponse(BulkResponse bulkItemResponses) { + if (bulkItemResponses.hasFailures()) { + log.info("批量更新商品索引的促销信息中出现部分异常:{}", bulkItemResponses.buildFailureMessage()); + } else { + log.info("批量更新商品索引的促销信息结果:{}", bulkItemResponses.status()); + } } - } catch (IOException e) { - log.error("批量更新商品索引的促销信息出现异常!", e); - } + + @Override + public void onFailure(Exception e) { + log.error("批量更新商品索引的促销信息出现异常!", e); + } + }); + } /**