【问题标题】:Spark Dataframe upsert to ElasticsearchSpark Dataframe upsert 到 Elasticsearch
【发布时间】:2018-06-21 15:47:16
【问题描述】:

我正在使用 Apache Spark DataFrame,我想将数据插入到 Elasticsearch 我发现我可以像这样覆盖它们

val df = spark.read.option("header","true").csv("/mnt/data/akc_breed_info.csv")

df.write
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
  .option("es.port","443")
  .option("es.net.ssl","true")
  .option("es.nodes", esURL)
  .option("es.mapping.id", index)
  .mode("Overwrite")
  .save("index/dogs")

但到目前为止我注意到的是这个命令mode("Overwrite")实际上是删除所有现有的重复数据并插入新数据

有没有办法我可以upsert 他们不删除并重新编写它们?因为我需要几乎实时查询这些数据。提前致谢

【问题讨论】:

    标签: scala apache-spark dataframe elasticsearch


    【解决方案1】:

    mode("Overwrite") 出现问题的原因是,当您覆盖整个数据框时,它会立即删除与您的数据框行匹配的所有数据,看起来整个索引对我来说都是空的,我想出了如何其实upsert

    这是我的代码

    df.write
      .format("org.elasticsearch.spark.sql")
      .option("es.nodes.wan.only","true")
      .option("es.nodes.discovery", "false")
      .option("es.nodes.client.only", "false")
      .option("es.net.ssl","true")
      .option("es.mapping.id", index)
      .option("es.write.operation", "upsert")
      .option("es.nodes", esURL)
      .option("es.port", "443")
      .mode("append")
      .save(path)
    

    请注意,您必须输入"es.write.operation", "upert".mode("append")

    【讨论】:

    • index 的值是多少?
    • @Soumendra 它是 ES 的 mapping id,如图所示。对我来说,它是userId
    【解决方案2】:

    尝试设置:

    es.write.operation = upsert
    

    这应该执行所需的操作。您可以在https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html找到更多详细信息

    【讨论】:

    • 感谢您的回答。我试过了,但它对我不起作用,我也需要输入.mode("append")
    猜你喜欢
    • 2017-01-26
    • 2016-04-11
    • 2021-06-04
    • 1970-01-01
    • 1970-01-01
    • 2017-10-18
    • 1970-01-01
    • 1970-01-01
    • 2015-12-16
    相关资源
    最近更新 更多