【发布时间】:2018-06-21 15:47:16
【问题描述】:
我正在使用 Apache Spark DataFrame,我想将数据插入到 Elasticsearch 我发现我可以像这样覆盖它们
val df = spark.read.option("header","true").csv("/mnt/data/akc_breed_info.csv")
df.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes.wan.only","true")
.option("es.port","443")
.option("es.net.ssl","true")
.option("es.nodes", esURL)
.option("es.mapping.id", index)
.mode("Overwrite")
.save("index/dogs")
但到目前为止我注意到的是这个命令mode("Overwrite")实际上是删除所有现有的重复数据并插入新数据
有没有办法我可以upsert 他们不删除并重新编写它们?因为我需要几乎实时查询这些数据。提前致谢
【问题讨论】:
标签: scala apache-spark dataframe elasticsearch