【问题标题】:Elasticsearch Connector as Source in FlinkElasticsearch 连接器作为 Flink 中的源
【发布时间】:2019-01-23 14:19:28
【问题描述】:

我使用 Elasticsearch 连接器作为接收器将数据插入 Elasticsearch(请参阅:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/elasticsearch.html)。

但是,我没有找到任何连接器来从 Elasticsearch 获取数据作为源。

是否有任何连接器或示例可以将 Elasticsearch 文档用作 Flink 管道中的源?

问候,

阿里

【问题讨论】:

    标签: elasticsearch apache-flink


    【解决方案1】:

    我不知道 Flink 的明确 ES 源。我确实看到一位用户在谈论将 elasticsearch-hadoop 用作 Flink 的 HadoopInputFormat,但我不知道这是否对他们有用(请参阅 their code)。

    【讨论】:

      【解决方案2】:

      我最终定义了一个从 ElasticSearch 函数中读取的简单函数

          public static class ElasticsearchFunction
              extends ProcessFunction<MetricMeasurement, MetricPrediction> {
      
          public ElasticsearchFunction() throws UnknownHostException {
              client = new PreBuiltTransportClient(settings)
                      .addTransportAddress(new TransportAddress(InetAddress.getByName("YOUR_IP"), PORT_NUMBER));
          }
      
          @Override
          public void processElement(MetricMeasurement in, Context context, Collector<MetricPrediction> out) throws Exception {
              MetricPrediction metricPrediction = new MetricPrediction();
      
              metricPrediction.setMetricId(in.getMetricId());
              metricPrediction.setGroupId(in.getGroupId());
              metricPrediction.setBucket(in.getBucket());
      
              // Get the metric measurement from Elasticsearch
              SearchResponse response = client.prepareSearch("YOUR_INDEX_NAME")
                      .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
                      .setQuery(QueryBuilders.termQuery("YOUR_TERM", in.getMetricId()))   // Query
                      .setPostFilter(QueryBuilders.rangeQuery("value").from(0L).to(50L))     // Filter
                      .setFrom(0).setSize(1).setExplain(true)
                      .get();
      
              SearchHit[] results = response.getHits().getHits();
              for(SearchHit hit : results){
                  String sourceAsString = hit.getSourceAsString();
                  if (sourceAsString != null) {
                      ObjectMapper mapper = new ObjectMapper();
                      MetricMeasurement obj = mapper.readValue(sourceAsString, MetricMeasurement.class);
                      obj.getMetricId();
                      metricPrediction.setPredictionValue(obj.getValue());
                  }
              }
              out.collect(metricPrediction);
          }
      }
      

      【讨论】:

        【解决方案3】:

        Hadoop 兼容性 + Elasticsearch Hadoop

        https://github.com/cclient/flink-connector-elasticsearch-source

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2019-06-20
          • 1970-01-01
          • 2020-11-26
          • 1970-01-01
          • 1970-01-01
          • 2022-11-10
          相关资源
          最近更新 更多