【问题标题】:Intermittent SocketTimeoutException with elasticsearch-rest-client-7.2.0elasticsearch-rest-client-7.2.0 的间歇性 SocketTimeoutException
【发布时间】:2019-11-28 18:55:35
【问题描述】:

我正在使用 RestHighLevelClient 7.2 版连接到 ElasticSearch 集群 7.2 版。我的集群有 3 个主节点和 2 个数据节点。数据节点内存配置:2 核和 8 GB。我已经习惯在我的 Spring Boot 项目中使用下面的代码来创建 RestHighLevelClient 实例。

 @Bean(destroyMethod = "close")
    @Qualifier("readClient")
    public RestHighLevelClient readClient(){

        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(elasticUser, elasticPass));

        RestClientBuilder builder = RestClient.builder(new HttpHost(elasticHost, elasticPort))
                .setHttpClientConfigCallback(httpClientBuilder ->httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider).setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(5).build()));

        builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(30000).setSocketTimeout(60000)
                );

        RestHighLevelClient restClient = new RestHighLevelClient(builder);
        return restClient;
    }

RestHighLevelClient 是一个单例 bean。我间歇性地收到带有 GET 和 PUT 请求的 SocketTimeoutException 。索引大小约为 50 MB。我尝试增加套接字超时值,但仍然收到相同的错误。我错过了一些配置吗?任何帮助将不胜感激。

【问题讨论】:

    标签: elasticsearch elastic-stack


    【解决方案1】:

    我只是想分享这个问题,以便它可以帮助其他人。 我正在使用负载均衡器连接到 ElasticSerach 集群。 从我的 RestClientBuilder 代码中可以看出,我只使用了负载平衡器主机和端口。虽然我有多个主节点,但 RestClient 仍然没有在连接超时的情况下重试我的请求。

    RestClientBuilder builder = RestClient.builder(new HttpHost(elasticHost, elasticPort))
                    .setHttpClientConfigCallback(httpClientBuilder ->httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider).setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(5).build()));
    

    根据 RestClient 代码,如果我们使用单个主机,那么在任何连接问题的情况下它都不会重试。 所以我改变了我的代码如下,它开始工作了。

    RestClientBuilder builder = RestClient.builder(new HttpHost(elasticHost, 9200),new HttpHost(elasticHost, 9201))).setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
    

    完整的 RestClient 代码请参考https://github.com/elastic/elasticsearch/blob/master/client/rest/src/main/java/org/elasticsearch/client/RestClient.java

    在 RestClient 中重试代码块

    private Response performRequest(final NodeTuple<Iterator<Node>> nodeTuple,
                                        final InternalRequest request,
                                        Exception previousException) throws IOException {
            RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache);
            HttpResponse httpResponse;
            try {
                httpResponse = client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, null).get();
            } catch(Exception e) {
                RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, e);
                onFailure(context.node);
                Exception cause = extractAndWrapCause(e);
                addSuppressedException(previousException, cause);
                if (nodeTuple.nodes.hasNext()) {
                    return performRequest(nodeTuple, request, cause);
                }
                if (cause instanceof IOException) {
                    throw (IOException) cause;
                }
                if (cause instanceof RuntimeException) {
                    throw (RuntimeException) cause;
                }
                throw new IllegalStateException("unexpected exception type: must be either RuntimeException or IOException", cause);
            }
            ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse);
            if (responseOrResponseException.responseException == null) {
                return responseOrResponseException.response;
            }
            addSuppressedException(previousException, responseOrResponseException.responseException);
            if (nodeTuple.nodes.hasNext()) {
                return performRequest(nodeTuple, request, responseOrResponseException.responseException);
            }
            throw responseOrResponseException.responseException;
        }
    

    【讨论】:

      【解决方案2】:

      我遇到了同样的问题,看到这个我意识到重试也在我这边发生在每个主机上(我有 3 个主机,异常发生在 3 个线程中)。我想发布它,因为您可能会遇到同样的问题,或者其他人可能会因为相同的 SocketConnection 异常而来到这篇文章。

      搜索官方文档,HighLevelRestClient 在后台使用 RestClient,RestClient 使用具有连接池的 CloseableHttpAsyncClient。 ElasticSearch 指定您应该在完成后关闭连接(这在应用程序中“完成”的定义听起来模棱两可),但总的来说,在互联网上,我发现您应该在应用程序关闭或结束时关闭它,而不是在您完成查询时。

      现在,在 apache 的官方文档中,他们有一个处理连接池的示例,我正在尝试遵循该示例,我将尝试复制该场景并发布如果解决了我的问题,可以找到代码这里:

      https://hc.apache.org/httpcomponents-asyncclient-dev/httpasyncclient/examples/org/apache/http/examples/nio/client/AsyncClientEvictExpiredConnections.java

      这是我目前所拥有的:

      @Bean(name = "RestHighLevelClientWithCredentials", destroyMethod = "close")
      public RestHighLevelClient elasticsearchClient(ElasticSearchClientConfiguration elasticSearchClientConfiguration,
                                                     RestClientBuilder.HttpClientConfigCallback httpClientConfigCallback) {
          return new RestHighLevelClient(
                  RestClient
                          .builder(getElasticSearchHosts(elasticSearchClientConfiguration))
                          .setHttpClientConfigCallback(httpClientConfigCallback)
          );
      }
      
      @Bean
      @RefreshScope
      public RestClientBuilder.HttpClientConfigCallback getHttpClientConfigCallback(
              PoolingNHttpClientConnectionManager poolingNHttpClientConnectionManager,
              CredentialsProvider credentialsProvider
      ) {
          return httpAsyncClientBuilder -> {
              httpAsyncClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
              httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
              httpAsyncClientBuilder.setConnectionManager(poolingNHttpClientConnectionManager);
              return httpAsyncClientBuilder;
          };
      }
      

      公共类 ElasticSearchClientManager {

      private ElasticSearchClientManager.IdleConnectionEvictor idleConnectionEvictor;
      
      /**
       * Custom client connection manager to create a connection watcher
       *
       * @param elasticSearchClientConfiguration elasticSearchClientConfiguration
       * @return PoolingNHttpClientConnectionManager
       */
      @Bean
      @RefreshScope
      public PoolingNHttpClientConnectionManager getPoolingNHttpClientConnectionManager(
              ElasticSearchClientConfiguration elasticSearchClientConfiguration
      ) {
          try {
              SSLIOSessionStrategy sslSessionStrategy = new SSLIOSessionStrategy(getTrustAllSSLContext());
              Registry<SchemeIOSessionStrategy> sessionStrategyRegistry = RegistryBuilder.<SchemeIOSessionStrategy>create()
                      .register("http", NoopIOSessionStrategy.INSTANCE)
                      .register("https", sslSessionStrategy)
                      .build();
              ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor();
              PoolingNHttpClientConnectionManager poolingNHttpClientConnectionManager =
                      new PoolingNHttpClientConnectionManager(ioReactor, sessionStrategyRegistry);
              idleConnectionEvictor = new ElasticSearchClientManager.IdleConnectionEvictor(poolingNHttpClientConnectionManager,
                      elasticSearchClientConfiguration);
              idleConnectionEvictor.start();
              return poolingNHttpClientConnectionManager;
          } catch (IOReactorException e) {
              throw new RuntimeException("Failed to create a watcher for the connection pool");
          }
      }
      
      private SSLContext getTrustAllSSLContext() {
          try {
              return new SSLContextBuilder()
                      .loadTrustMaterial(null, (x509Certificates, string) -> true)
                      .build();
          } catch (Exception e) {
              throw new RuntimeException("Failed to create SSL Context with open certificate", e);
          }
      }
      
      public IdleConnectionEvictor.State state() {
          return idleConnectionEvictor.evictorState;
      }
      
      @PreDestroy
      private void finishManager() {
          idleConnectionEvictor.shutdown();
      }
      
      
      public static class IdleConnectionEvictor extends Thread {
      
          private final NHttpClientConnectionManager nhttpClientConnectionManager;
          private final ElasticSearchClientConfiguration elasticSearchClientConfiguration;
      
          @Getter
          private State evictorState;
          private volatile boolean shutdown;
      
          public IdleConnectionEvictor(NHttpClientConnectionManager nhttpClientConnectionManager,
                                       ElasticSearchClientConfiguration elasticSearchClientConfiguration) {
              super();
              this.nhttpClientConnectionManager = nhttpClientConnectionManager;
              this.elasticSearchClientConfiguration = elasticSearchClientConfiguration;
          }
      
          @Override
          public void run() {
              try {
                  while (!shutdown) {
                      synchronized (this) {
                          wait(elasticSearchClientConfiguration.getExpiredConnectionsCheckTime());
                          // Close expired connections
                          nhttpClientConnectionManager.closeExpiredConnections();
                          // Optionally, close connections
                          // that have been idle longer than 5 sec
                          nhttpClientConnectionManager.closeIdleConnections(elasticSearchClientConfiguration.getMaxTimeIdleConnections(),
                                  TimeUnit.SECONDS);
                          this.evictorState = State.RUNNING;
                      }
                  }
              } catch (InterruptedException ex) {
                  this.evictorState = State.NOT_RUNNING;
              }
          }
      
          private void shutdown() {
              shutdown = true;
              synchronized (this) {
                  notifyAll();
              }
          }
      
          public enum State {
              RUNNING,
              NOT_RUNNING
          }
      }
      

      }

      【讨论】:

        猜你喜欢
        • 2023-04-04
        • 1970-01-01
        • 2019-11-10
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-07-06
        • 2020-09-20
        相关资源
        最近更新 更多