ES是一个分布式搜索引擎,其除了用户提供必要的通信服务外,集群间也必须保持紧密的通信联系,才能在必要的时候给出正确的结果。其则必然涉及到各种繁多且要求高的通信场景,那么如何实现高性能的通信,则是其必须要考虑的问题。

  今天,我们就以es的transportService的实现为窥点,观察es的高性能的通信模块实现吧。

 

1. 前言概要

  谈到高性能的网络通信,相信很多人都明白大概是什么道理,或者看过我之前的一些文章,也必然清楚其核心原理。总结来说,其实就是利用io多路复用技术,充分利用带宽,从而达到高性能的目标。

  而具体到java语言上来,能聊的点也许就更少了。比如nio, netty, akka... 

  所以,其实本文所讨论的目标,看起来没有那么神秘,也没必要神秘。我们仅站在研究ES实现细节的方向,去深入理解一些实际的问题,目的仅是为了解惑。

  

2. transportService的初始化

  es中几乎所有的模块,都是在服务启动的时候进行初始化的,这是自然。一来是启动时缓慢一点是可以的,二来是启动的时候有非常多的上下文信息可用非常方便各种初始化,三来是能够提前发现问题而不是运行了很久之后才发现不可解决的问题。

  而transportService是在创建Node时进行初始化的。

    // org.elasticsearch.node.Node#start
    /**
     * Constructs a node
     *
     * @param initialEnvironment         the initial environment for this node, which will be added to by plugins
     * @param classpathPlugins           the plugins to be loaded from the classpath
     * @param forbidPrivateIndexSettings whether or not private index settings are forbidden when creating an index; this is used in the
     *                                   test framework for tests that rely on being able to set private settings
     */
    protected Node(final Environment initialEnvironment,
                   Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {
        ...
        try {
            ...
            new TemplateUpgradeService(client, clusterService, threadPool, indexTemplateMetadataUpgraders);
            final Transport transport = networkModule.getTransportSupplier().get();
            Set<String> taskHeaders = Stream.concat(
                pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
                Stream.of(Task.X_OPAQUE_ID)
            ).collect(Collectors.toSet());
            // 创建 transportService
            final TransportService transportService = newTransportService(settings, transport, threadPool,
                networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
            final GatewayMetaState gatewayMetaState = new GatewayMetaState();
            ...
        } catch (IOException ex) {
            throw new ElasticsearchException("failed to bind service", ex);
        } finally {
            if (!success) {
                IOUtils.closeWhileHandlingException(resourcesToClose);
            }
        }
    }

  即要初始化 transportService , 重点就要看 newTransportService() 如何处理了。在当然了,要进行这个方法的调用,它其实比较多的前提,即各种入参的初始化。重要一点的就是:线程池的创建,transport 的初始化。线程池咱们略去不说,主要是它会在非常多的地方用到,单独在这里讲也不合适。那么就主要看看 transport 是如何初始化的即可。

 

2.1. NetworkModule 的实例化

  从上面的实现中,我们看到要获取 transport 实例,还需要先拿到 networkModule ,这又是如何初始化的呢?

            // 在 Node() 的构造方法中,直接new出来的 。
            final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
                threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
                networkService, restController, clusterService.getClusterSettings());
    // org.elasticsearch.common.network.NetworkModule#NetworkModule
    /**
     * Creates a network module that custom networking classes can be plugged into.
     * @param settings The settings for the node
     * @param transportClient True if only transport classes should be allowed to be registered, false otherwise.
     */
    public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlugin> plugins, ThreadPool threadPool,
                         BigArrays bigArrays,
                         PageCacheRecycler pageCacheRecycler,
                         CircuitBreakerService circuitBreakerService,
                         NamedWriteableRegistry namedWriteableRegistry,
                         NamedXContentRegistry xContentRegistry,
                         NetworkService networkService, HttpServerTransport.Dispatcher dispatcher,
                         ClusterSettings clusterSettings) {
        this.settings = settings;
        this.transportClient = transportClient;
        // 这里的 plugin 可能有多个,如 XPackPlugin, Netty4Plugin, Security, VotingOnlyNodePlugin
        for (NetworkPlugin plugin : plugins) {
            Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays,
                pageCacheRecycler, circuitBreakerService, xContentRegistry, networkService, dispatcher, clusterSettings);
            if (transportClient == false) {
                for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) {
                    // 向 transportHttpFactories 中注册相关信息
                    registerHttpTransport(entry.getKey(), entry.getValue());
                }
            }
            Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, pageCacheRecycler,
                circuitBreakerService, namedWriteableRegistry, networkService);
            for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
                // 向 transportFactories 中注册相关信息
                registerTransport(entry.getKey(), entry.getValue());
            }
            List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry,
                threadPool.getThreadContext());
            for (TransportInterceptor interceptor : transportInterceptors) {
                // 向 transportIntercetors 中注册拦截器
                registerTransportInterceptor(interceptor);
            }
        }
    }

  可见,整个 NetworkModule 的工作,重点在于注册相关的组件到自身,以便将来取用。这个容器则有可能是 map 形式的,也有可能是 list 形式的。总之,能够起到注册的作用即可。感兴趣的同学可以展开以下查看更多注册实现:

    private final Map<String, Supplier<Transport>> transportFactories = new HashMap<>();
    private final Map<String, Supplier<HttpServerTransport>> transportHttpFactories = new HashMap<>();
    private final List<TransportInterceptor> transportIntercetors = new ArrayList<>();

    /** Adds an http transport implementation that can be selected by setting {@link #HTTP_TYPE_KEY}. */
    // TODO: we need another name than "http transport"....so confusing with transportClient...
    private void registerHttpTransport(String key, Supplier<HttpServerTransport> factory) {
        if (transportClient) {
            throw new IllegalArgumentException("Cannot register http transport " + key + " for transport client");
        }
        if (transportHttpFactories.putIfAbsent(key, factory) != null) {
            throw new IllegalArgumentException("transport for name: " + key + " is already registered");
        }
    }
    /** Adds a transport implementation that can be selected by setting {@link #TRANSPORT_TYPE_KEY}. */
    private void registerTransport(String key, Supplier<Transport> factory) {
        if (transportFactories.putIfAbsent(key, factory) != null) {
            throw new IllegalArgumentException("transport for name: " + key + " is already registered");
        }
    }

    /**
     * Registers a new {@link TransportInterceptor}
     */
    private void registerTransportInterceptor(TransportInterceptor interceptor) {
        this.transportIntercetors.add(Objects.requireNonNull(interceptor, "interceptor must not be null"));
    }
View Code

相关文章:

  • 2021-12-14
  • 2021-12-24
  • 2021-08-09
  • 2022-12-23
  • 2021-04-28
  • 2022-01-01
  • 2021-04-16
  • 2021-06-27
猜你喜欢
  • 2021-12-21
  • 2021-04-09
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2019-06-30
  • 2021-07-08
相关资源
相似解决方案