【问题标题】:Spark Streaming Kafka to ESSpark Streaming Kafka 到 ES
【发布时间】:2020-10-06 10:47:56
【问题描述】:

我有一个火花流作业,它将从 kafka 读取并通过 Http 请求写入弹性。

我想验证来自 Kafka 的每个请求,并根据业务需要更改有效负载并写入 Elastic Search。

我已经使用 ES Http Request 将数据推送到 Elastic Search 中。有人可以指导我如何通过数据框将数据写入 ES 吗?

代码片段:

val dfInput = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .option("startingOffsets", "latest")
  .option("group.id", sourceTopicGroupId)
  .option("failOnDataLoss", "false")
  .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
  .load()

import spark.implicits._

val resultDf = dfInput
  .withColumn("value", $"value".cast("string"))
  .select("value")

resultDf.writeStream.foreach(new ForeachWriter[Row] {
  override def open(partitionId: Long, version: Long): Boolean = true

  override def process(value: Row): Unit = {
    processEventsData(value.get(0).asInstanceOf[String], deviceIndex, msgIndex, retryOnConflict,auth,refreshInterval,deviceUrl,messageUrl,spark)
  }

  override def close(errorOrNull: Throwable): Unit = {
  }
}).trigger(Trigger.ProcessingTime(triggerPeriod)).start().awaitTermination() //"1 second"
}

这样我们无法实现性能。

有什么办法吗?

  • Spark 版本 2.3.2
  • Kafka 分区 20
  • ES 版本 7.7.0

【问题讨论】:

  • 提供更多细节,使用的 spark 版本是什么?还有kafka分区等?
  • Spark 版本 - 2.3.2,Kafka 分区 - 20,ES 版本 - 7.7.0
  • 有什么理由不使用 Kafka 连接?
  • 我想通过 Spark Dataframe 将嵌套的 JSON 插入 Elasticseatrch

标签: apache-spark elasticsearch spark-structured-streaming


【解决方案1】:

你可以使用elasticsearch-spark-20_2.11,很简单,更多信息es-hadoop

EsSpark.saveJsonToEs(rdd, index, conf)

【讨论】:

    猜你喜欢
    • 2019-08-08
    • 2016-03-12
    • 2018-05-17
    • 1970-01-01
    • 1970-01-01
    • 2017-12-28
    • 1970-01-01
    • 2020-10-29
    • 2019-01-15
    相关资源
    最近更新 更多