【发布时间】:2021-07-25 02:10:19
【问题描述】:
我有一个 scala Spark 应用程序,我想在将更新的数据加载到集合之前取消设置 Mongo 集合中所有文档的字段。
假设我有一个这样的数据源,我想从所有文档中删除“排名”字段(有些可能有这个字段,有些可能没有)。
[
{
"_id": 123,
"value": "a"
},
{
"_id": 234,
"value": "b",
"rank": 1
},
...
]
我知道在 mongo 中有一个 unset 函数,但我在 mongo spark connector 中没有看到任何关于如何使用 Spark 执行此类操作的文档。
在保存到 Mongo 之前,我尝试过滤掉该字段并将其放入数据集中,但我遇到了以下错误:
com.mongodb.MongoBulkWriteException: Bulk write operation error on server localhost:58200. Write errors: [BulkWriteError{index=0, code=9, message=''$set' is empty. You must specify a field like so: {$set: {<field>: ...}}', details={}}].
at com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:173)
...
我有以下定义:
case class Item(_id: Int, rank: Option[Int])
val idCol = new ColumnName("_id")
val rankCol = new ColumnName("rank")
以及在同一个类中执行类似操作的函数:
def resetRanks(): {
val records = MongoSpark
.load[Item](
sparkSession,
ReadConfig(
Map(
"collection" -> mongoConfig.collection,
"database" -> mongoConfig.db,
"uri" -> mongoConfig.uri
),
Some(ReadConfig(sparkSession))
)
)
.select(idCol, rankCol)
.repartition(sparkConfig.partitionSize, $"_id")
.where(rankCol.isNotNull)
.drop(rankCol)
MongoSpark.save(
records,
WriteConfig(
Map(
"collection" -> mongoConfig.collection,
"database" -> mongoConfig.db,
"forceInsert" -> "false",
"ordered" -> "true",
"replaceDocument" -> "false", // not replacing docs since there are other fields I'd like to keep intact that I won't be modifying
"uri" -> mongoConfig.uri,
"writeConcern.w" -> "majority"
),
Some(WriteConfig(sparkSession))
)
)
}
我正在使用 MongoSparkConnector v2.4.2。
我还看到了这个thread,这似乎表明我收到上述错误的原因是我不能有空字段,但我需要取消设置这些字段,所以我不知道该怎么走关于它。
感谢任何提示或指示。
【问题讨论】:
标签: mongodb scala apache-spark