【问题标题】:Updating/Replacing Mongo Documents using Apache Spark使用 Apache Spark 更新/替换 Mongo 文档
【发布时间】:2019-11-29 14:35:18
【问题描述】:

当我们使用 MongoSpark 连接器处理 SparkMongoDB 时,这是一个常见问题。此连接器旨在以批处理方式将文档插入/更新到 MongoDB。使用 Spark 插入/更新文档有三种方法。

  1. RDD[文档]
  2. DataFrame[CaseClass]
  3. 数据集[CaseClass]

dataset 和 dataframe 都支持 insert/update 使用 MangoSpark.save() 方法的文档,而 RDD[Document] 仅支持插入。 所以我们在使用 Mongo Spark 来更新 RDD[Document] 时遇到了问题。

是否有任何解决方案可以使用 Spark 将 RDD[Document] 更新/替换到 MongoDB 中?

【问题讨论】:

    标签: mongodb apache-spark rdd connector


    【解决方案1】:

    目前Mongo Spark Connector不支持更新/替换RDD[Document]。但是有一个workaround解决方案可以在 Connector 的帮助下使用 Apache Spark 更新/替换 Mongo Documents 的 RDD[Document]

    以下是更新/替换示例数据的示例代码:

    db.people.find()

    { "_id" : 100, "name" : "Naga", "age" : 30, "place" : "Bangalore" }

    { "_id" : 101, "name" : "Ravi", "age" : 33, "place" : "Bangalore" }

    { "_id" : 102, "name" : "Hari", "age" : 23, "place" : "迈索尔" }

    val conf = new SparkConf().setAppName("Spark Mongo").setMaster("local[*]") val readOverrides = new HashMap[String, String]() readOverrides.put("spark.mongodb.input.uri", "mongodb://localhost:27017/info.people") val readConfig = ReadConfig.create(conf, readOverrides) val sc = 新 SparkContext(conf) val spark = SparkSession.builder().getOrCreate() val peopleRDD = MongoSpark.load(sc, readConfig) val updateRDD = peopleRDD.map { document => document.append("state", "karnataka") } val writeOverrides = new HashMap[String, String]() writeOverrides.put("spark.mongodb.output.uri", "mongodb://localhost:27017/info.people") writeOverrides.put("replaceDocument", "false") val writeConfig = WriteConfig.create(conf, writeOverrides) 保存(更新RDD,writeConfig) def save(rdd: RDD[Document], writeConfig: WriteConfig): Unit = { val mongoConnector = MongoConnector(writeConfig.asOptions) rdd.foreachPartition { 分区 => { 如果(partition.nonEmpty){ mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[Document] => { partition.foreach { 文档 => { val searchDocument = new Document() searchDocument.append("_id", document.get("_id").asInstanceOf[Double]) collection.replaceOne(搜索文档,文档) } } } }) } } } }

    { "_id" : 100, "name" : "Naga", "age" : 30, "place" : "Bangalore", "state" : "karnataka" }

    { "_id" : 101, "name" : "Ravi", "age" : 33, "place" : "Bangalore", "state" : "karnataka" }

    { "_id" : 102, "name" : "Hari", "age" : 23, "place" : "Mysore", "state" : "karnataka" }

    这是可行的解决方案。

    【讨论】:

      猜你喜欢
      • 2019-03-12
      • 2018-04-25
      • 2015-11-18
      • 2014-09-27
      • 2021-04-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-03-07
      相关资源
      最近更新 更多