【问题标题】:Spark Streaming: Store processed data into elasticsearchSpark Streaming:将处理后的数据存储到 elasticsearch
【发布时间】:2019-03-29 16:15:27
【问题描述】:

我有一个练习,它实现了一项服务,该服务从 Kafka 获取数据,对其进行处理并使用 Spark Streaming 将结果存储到 elasticsearch 中。

我可以将数据从 Kafka 提取到我的服务并在 Spark 集群中对其进行处理,但我不知道如何在操作中将结果持久保存到 elasticsearch 中。截至目前,我的代码如下所示:

SparkConf sparkConf = new SparkConf()...

JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
...
JavaPairInputDStream<String, Event> eventStream = KafkaUtils.createDirectStream(...);
eventStream.foreachRDD( rdd -> {
    rdd.foreach(Application::processEvent);
});

方法 processEvent 看起来像:

public static void processEvent(Tuple2<String, Event> t) {
    //Process event t here
    ...
    // here I want to persist the result into elasticsearch
}

我是 Apache Spark 的新手。请告诉我如何在处理事件后将结果保存到 elasticsearch 中。 请注意,我知道如何在独立的 java 应用程序中将文档索引到 elasticsearch,我只是不知道如何将它与 Spark Streaming 一起使用。

谢谢。

【问题讨论】:

    标签: java apache-spark elasticsearch streaming


    【解决方案1】:

    您需要提供弹性配置并调用saveToEs()将数据从Spark RDD保存到Elastic Search Index。

    以下是scala代码,你可以参考,你也可以用Java实现

    我有以下依赖项:

    libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.1.0"
    libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.1.0"
    
    libraryDependencies += "org.elasticsearch" % "elasticsearch-spark-20_2.11" % "5.0.0"
    

    然后代码如下:

    // Create SparkConf object with Elastic details
    
    val sparkConf = new SparkConf().setAppName("Job Name")
    sparkConf.set("es.nodes", "Elastic_IP_1, Elastic_IP_2")
    sparkConf.set("es.port", "9200")
    sparkConf.set("es.batch.size.entries", "1000")
    sparkConf.set("es.batch.size.bytes", "102400")
    
    // Create SparkSession using the above SparkConf
    
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    
    ...
    ...
    ...
    
    val myRDD = ......
    
    var elasticConfig = new HashMap[String, String]
    elasticConfig += ("es.mapping.date.detection" -> "false")
    elasticConfig += ("es.mapping.date.rich" -> "false")
    
    import org.elasticsearch.spark._
    
    // Save rdd to elastic search index
    myRDD.saveToEs("indexName", elasticConfig)
    

    【讨论】:

    • @Khode 我已经打开了一个 Streaming Context 用于从 Kafka 获取数据,那么现在我可以打开另一个使用 elasticsearch 的上下文吗?
    • 您不必再创建一个上下文,只需将这些配置添加到现有的上下文中即可,以上是批量执行的示例,对流式进行必要的更改...
    猜你喜欢
    • 2018-08-13
    • 2015-10-18
    • 2017-01-14
    • 1970-01-01
    • 2017-01-27
    • 2016-10-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多