【问题标题】:Client is not connected to any Elasticsearch nodes in Flink客户端未连接到 Flink 中的任何 Elasticsearch 节点
【发布时间】:2017-09-04 03:34:09
【问题描述】:

我正在使用 Flink 1.1.2 并在 Maven 中添加了 ElesticSearch 依赖项,如下所示

 <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch2_2.10</artifactId>
            <version>1.2.0</version>
 </dependency>

我的程序包含以下代码,它从 Kafka 读取数据并插入到 Elastic 搜索中

public class ReadFromKafka {


    public static void main(String[] args) throws Exception {
        // create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "test");


        DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
                new JoinSchema(), properties));

        System.out.println("reading form kafka ");

        message.print();


        Map<String, String> config = new HashMap<>();
        config.put("bulk.flush.max.actions", "1");   // flush inserts after every event
        config.put("cluster.name", "elasticsearch_amar"); // default cluster name

        List<InetSocketAddress> transports = new ArrayList<>();
// set default connection details
        transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));

       message.addSink(new ElasticsearchSink<>(config,transports,new ElasticInserter()));

        env.execute();


    } //main

    public static class ElasticInserter implements ElasticsearchSinkFunction<JoinedStreamEvent>{


        @Override
        public void process(JoinedStreamEvent record, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {

            Map<String, Integer> json = new HashMap<>();

            json.put("Time", record.getPatient_id());
            json.put("heart Rate ", record.getHeartRate());
            json.put("resp rete", record.getRespirationRate());


            IndexRequest rqst = Requests.indexRequest()
                    .index("nyc-places")           // index name
                    .type("popular-locations")     // mapping name
                    .source(json);

            requestIndexer.add(rqst);

        } //process


    } //ElasticInserter

} //ReadFromKafka

我已经使用homebrew 安装了ElesticSearch,然后使用elesticsearch 命令启动它,如下所示

但是,当我启动程序时,出现以下错误

【问题讨论】:

    标签: maven elasticsearch apache-flink elasticsearch-plugin flink-streaming


    【解决方案1】:

    我的声誉低于 50,无法评论。

    我有一点建议:

    • 首先检查ES是否启动, 见Can't Connect to Elasticsearch (through Curl)
    • 建议使用docker容器启动ES,例如。 docker run -d --name es -p 9200:9200 elasticsearch:2 -Des.network.host=0.0.0.0
    • 顺便说一句,您可以尝试:在 ES config elasticsearch.yml 中将 es.network.host 的值修改为 0.0.0.0

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-02-12
      • 2019-12-14
      • 2023-03-14
      • 1970-01-01
      • 2016-02-20
      • 1970-01-01
      • 2023-03-29
      相关资源
      最近更新 更多