目前Mongo Spark Connector不支持更新/替换RDD[Document]。但是有一个workaround解决方案可以在 Connector 的帮助下使用 Apache Spark 更新/替换 Mongo Documents 的 RDD[Document]。
以下是更新/替换示例数据的示例代码:
db.people.find()
{ "_id" : 100, "name" : "Naga", "age" : 30, "place" : "Bangalore" }
{ "_id" : 101, "name" : "Ravi", "age" : 33, "place" : "Bangalore" }
{ "_id" : 102, "name" : "Hari", "age" : 23, "place" : "迈索尔" }
val conf = new SparkConf().setAppName("Spark Mongo").setMaster("local[*]")
val readOverrides = new HashMap[String, String]()
readOverrides.put("spark.mongodb.input.uri", "mongodb://localhost:27017/info.people")
val readConfig = ReadConfig.create(conf, readOverrides)
val sc = 新 SparkContext(conf)
val spark = SparkSession.builder().getOrCreate()
val peopleRDD = MongoSpark.load(sc, readConfig)
val updateRDD = peopleRDD.map { document => document.append("state", "karnataka") }
val writeOverrides = new HashMap[String, String]()
writeOverrides.put("spark.mongodb.output.uri", "mongodb://localhost:27017/info.people")
writeOverrides.put("replaceDocument", "false")
val writeConfig = WriteConfig.create(conf, writeOverrides)
保存(更新RDD,writeConfig)
def save(rdd: RDD[Document], writeConfig: WriteConfig): Unit = {
val mongoConnector = MongoConnector(writeConfig.asOptions)
rdd.foreachPartition { 分区 =>
{
如果(partition.nonEmpty){
mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[Document] =>
{
partition.foreach { 文档 =>
{
val searchDocument = new Document()
searchDocument.append("_id", document.get("_id").asInstanceOf[Double])
collection.replaceOne(搜索文档,文档)
}
}
}
})
}
}
}
}
{ "_id" : 100, "name" : "Naga", "age" : 30, "place" : "Bangalore", "state" : "karnataka" }
{ "_id" : 101, "name" : "Ravi", "age" : 33, "place" : "Bangalore", "state" : "karnataka" }
{ "_id" : 102, "name" : "Hari", "age" : 23, "place" : "Mysore", "state" : "karnataka" }
这是可行的解决方案。