【发布时间】:2020-10-06 10:47:56
【问题描述】:
我有一个火花流作业,它将从 kafka 读取并通过 Http 请求写入弹性。
我想验证来自 Kafka 的每个请求,并根据业务需要更改有效负载并写入 Elastic Search。
我已经使用 ES Http Request 将数据推送到 Elastic Search 中。有人可以指导我如何通过数据框将数据写入 ES 吗?
代码片段:
val dfInput = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets", "latest")
.option("group.id", sourceTopicGroupId)
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
.load()
import spark.implicits._
val resultDf = dfInput
.withColumn("value", $"value".cast("string"))
.select("value")
resultDf.writeStream.foreach(new ForeachWriter[Row] {
override def open(partitionId: Long, version: Long): Boolean = true
override def process(value: Row): Unit = {
processEventsData(value.get(0).asInstanceOf[String], deviceIndex, msgIndex, retryOnConflict,auth,refreshInterval,deviceUrl,messageUrl,spark)
}
override def close(errorOrNull: Throwable): Unit = {
}
}).trigger(Trigger.ProcessingTime(triggerPeriod)).start().awaitTermination() //"1 second"
}
这样我们无法实现性能。
有什么办法吗?
- Spark 版本 2.3.2
- Kafka 分区 20
- ES 版本 7.7.0
【问题讨论】:
-
提供更多细节,使用的 spark 版本是什么?还有kafka分区等?
-
Spark 版本 - 2.3.2,Kafka 分区 - 20,ES 版本 - 7.7.0
-
有什么理由不使用 Kafka 连接?
-
我想通过 Spark Dataframe 将嵌套的 JSON 插入 Elasticseatrch
标签: apache-spark elasticsearch spark-structured-streaming