【问题标题】:How to use BasicAuth with ElasticSearch Connector on Flink如何在 Flink 上使用 BasicAuth 和 ElasticSearch 连接器
【发布时间】:2017-11-22 18:27:32
【问题描述】:

我想在 flink 上使用弹性生产者,但我在身份验证方面遇到了一些麻烦: 我的弹性搜索集群前面有 Nginx,我在 nginx 中使用基本身份验证。

但是使用弹性搜索连接器,我无法在我的网址中添加基本身份验证(因为 InetSocketAddress)

您是否有使用带有基本身份验证的弹性搜索连接器的想法?

感谢您的宝贵时间。

这是我的代码:

 val configur = new java.util.HashMap[String, String]

    configur.put("cluster.name", "cluster")

    configur.put("bulk.flush.max.actions", "1000")

    val transportAddresses = new java.util.ArrayList[InetSocketAddress]
    transportAddresses.add(new InetSocketAddress(InetAddress.getByName("cluster.com"), 9300))


    jsonOutput.filter(_.nonEmpty).addSink(new ElasticsearchSink(configur,
                                                                transportAddresses,
                                                                new ElasticsearchSinkFunction[String] {
      def createIndexRequest(element: String): IndexRequest = {

        val jsonMap = parse(element).values.asInstanceOf[java.util.HashMap[String, String]]

        return Requests.indexRequest()
          .index("flinkTest")
          .source(jsonMap);
      }

      override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
        indexer.add(createIndexRequest(element))
      }
    }))

【问题讨论】:

    标签: elasticsearch apache-flink flink-streaming basic-authentication


    【解决方案1】:

    Flink 使用 Elasticsearch 传输客户端,该客户端使用二进制协议在端口 9300 上连接。 您的 nginx 代理位于端口 9200 上的 HTTP 接口前面。

    Flink 不会使用您的代理,因此无需提供身份验证。

    【讨论】:

    • 谢谢,我不明白有 2 种不同的协议,很抱歉。但是我的flink集群和ES集群不在同一个本地网络(不是同一个provider),所以我需要使用http客户端但是我没有找到这个官方客户端flink/elastic-search所以我需要创建它
    【解决方案2】:

    如果你需要使用 HTTP Client 来连接 Flink 和 Elasticsearch,一个解决方案是使用Jest Library

    你必须创建一个自定义的 SinkFunction,就像这个基本的 java 类:

        package fr.gfi.keenai.streaming.io.sinks.elasticsearch5;
    
        import org.apache.flink.configuration.Configuration;
        import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    
        import io.searchbox.client.JestClient;
        import io.searchbox.client.JestClientFactory;
        import io.searchbox.client.config.HttpClientConfig;
        import io.searchbox.core.Index;
    
        public class ElasticsearchJestSinkFunction<T> extends RichSinkFunction<T> {
    
            private static final long serialVersionUID = -7831614642918134232L;
    
            private JestClient client;
    
            @Override
            public void invoke(T value) throws Exception {
    
                String document = convertToJsonDocument(value); 
    
                Index index = new Index.Builder(document).index("YOUR_INDEX_NAME").type("YOUR_DOCUMENT_TYPE").build();
                client.execute(index);
    
            }
    
            @Override
            public void open(Configuration parameters) throws Exception {
    
                // Construct a new Jest client according to configuration via factory
                JestClientFactory factory = new JestClientFactory();
                factory.setHttpClientConfig(new HttpClientConfig.Builder("http://localhost:9200")
                        .multiThreaded(true)
                        // Per default this implementation will create no more than 2 concurrent
                        // connections per given route
                        .defaultMaxTotalConnectionPerRoute(2)
                        // and no more 20 connections in total
                        .maxTotalConnection(20)
                        // Basic username and password authentication
                        .defaultCredentials("YOUR_USER", "YOUR_PASSWORD")
                        .build());
                client = factory.getObject();
            }
    
            private String convertToJsonDocument(T value) {
                //TODO
                return "{}";
            }
    
        }
    

    请注意,您还可以使用批量操作来提高速度。

    post 的“将 Flink 连接到 Amazon RS”部分描述了 Flink 的 Jest 实现示例

    【讨论】:

    • 如果您要宣传一项技术,您至少应该展示其用途并阐明这将如何解决 OP 的问题。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-12-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-04-04
    相关资源
    最近更新 更多