【问题标题】:How to write ElasticsearchSink for Spark Structured streaming如何为 Spark 结构化流编写 ElasticsearchSink
【发布时间】:2018-10-27 20:17:42
【问题描述】:

我正在使用 Spark 结构化流处理来自 Kafka 队列的大量数据并进行一些繁重的 ML 计算,但我需要将结果写入 Elasticsearch。

我尝试使用ForeachWriter,但无法在其中获取SparkContext,另一种选择可能是在ForeachWriter 中使用HTTP Post

现在,我正在考虑编写自己的 ElasticsearchSink。

是否有任何文档可以为 Spark 结构化流创建接收器?

【问题讨论】:

    标签: scala apache-spark elasticsearch spark-structured-streaming


    【解决方案1】:

    如果您使用的是 Spark 2.2+ 和 ES 6.x,那么有一个开箱即用的 ES 接收器:

    df
      .writeStream
      .outputMode(OutputMode.Append())
      .format("org.elasticsearch.spark.sql") 
      .option("es.mapping.id", "mappingId")
      .start("index/type") // index/type
    

    如果你像我一样使用 ES 5.x,你需要实现 EsSinkEsSinkProvider

    EsSinkProvider:

    class EsSinkProvider extends StreamSinkProvider with DataSourceRegister {
    
      override def createSink(sqlContext: SQLContext,
                              parameters: Map[String, String],
                              partitionColumns: Seq[String],
                              outputMode: OutputMode): Sink = {
    
        EsSink(sqlContext, parameters, partitionColumns, outputMode)
      }
    
      override def shortName(): String = "my-es-sink"
    }
    

    EsSink:

    case class ElasticSearchSink(sqlContext: SQLContext,
                                 options: Map[String, String],
                                 partitionColumns: Seq[String],
                                 outputMode: OutputMode)
      extends Sink {
    
    
      override def addBatch(batchId: Long, df: DataFrame): Unit = synchronized {
        val schema = data.schema
        // this ensures that the same query plan will be used
        val rdd: RDD[String] = df.queryExecution.toRdd.mapPartitions { rows =>
          val converter = CatalystTypeConverters.createToScalaConverter(schema)
          rows.map(converter(_).asInstanceOf[Row]).map(_.getAs[String](0))
        }
    
        // from org.elasticsearch.spark.rdd library
        EsSpark.saveJsonToEs(rdd, "index/type", Map("es.mapping.id" -> "mappingId"))
      }
    }
    

    最后,在编写流时,将此提供程序类用作format

    df
      .writeStream
      .queryName("ES-Writer")
      .outputMode(OutputMode.Append())
      .format("path.to.EsProvider")
      .start()
    

    【讨论】:

      【解决方案2】:

      你可以看看ForeachSink。它展示了如何实现 Sink 并将 DataFrame 转换为 RDD(这非常棘手并且有很大的注释)。但是,请注意,Sink API 仍然是私有且不成熟的,将来可能会更改。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2017-05-04
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-10-06
        • 1970-01-01
        • 2017-03-06
        • 1970-01-01
        相关资源
        最近更新 更多