【问题标题】:Using Spark, is there a way to bulk unset a field in Mongo documents?使用 Spark,有没有办法批量取消设置 Mongo 文档中的字段?
【发布时间】:2021-07-25 02:10:19
【问题描述】:

我有一个 scala Spark 应用程序,我想在将更新的数据加载到集合之前取消设置 Mongo 集合中所有文档的字段。

假设我有一个这样的数据源,我想从所有文档中删除“排名”字段(有些可能有这个字段,有些可能没有)。

[
  { 
    "_id": 123,
    "value": "a"
  },
  { 
    "_id": 234,
    "value": "b",
    "rank": 1
  },
 ...
]

我知道在 mongo 中有一个 unset 函数,但我在 mongo spark connector 中没有看到任何关于如何使用 Spark 执行此类操作的文档。

在保存到 Mongo 之前,我尝试过滤掉该字段并将其放入数据集中,但我遇到了以下错误:

com.mongodb.MongoBulkWriteException: Bulk write operation error on server localhost:58200. Write errors: [BulkWriteError{index=0, code=9, message=''$set' is empty. You must specify a field like so: {$set: {<field>: ...}}', details={}}]. 
    at com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:173)
    ...

我有以下定义:

case class Item(_id: Int, rank: Option[Int])

val idCol = new ColumnName("_id")
val rankCol = new ColumnName("rank")

以及在同一个类中执行类似操作的函数:

def resetRanks(): {
  val records = MongoSpark
          .load[Item](
            sparkSession,
            ReadConfig(
              Map(
                "collection" -> mongoConfig.collection,
                "database" -> mongoConfig.db,
                "uri" -> mongoConfig.uri
              ),
              Some(ReadConfig(sparkSession))
            )
          )
          .select(idCol, rankCol)
          .repartition(sparkConfig.partitionSize, $"_id")
          .where(rankCol.isNotNull)
          .drop(rankCol)

  MongoSpark.save(
        records,
        WriteConfig( 
          Map(
            "collection" -> mongoConfig.collection,
            "database" -> mongoConfig.db,
            "forceInsert" -> "false", 
            "ordered" -> "true",
            "replaceDocument" -> "false", // not replacing docs since there are other fields I'd like to keep intact that I won't be modifying
            "uri" -> mongoConfig.uri,
            "writeConcern.w" -> "majority"
          ),
          Some(WriteConfig(sparkSession))
        )
      )
}

我正在使用 MongoSparkConnector v2.4.2。

我还看到了这个thread,这似乎表明我收到上述错误的原因是我不能有空字段,但我需要取消设置这些字段,所以我不知道该怎么走关于它。

感谢任何提示或指示。

【问题讨论】:

    标签: mongodb scala apache-spark


    【解决方案1】:

    您可以尝试这样的方法,您可以从数据框中删除列并写入新集合。我在这里观察到的一个问题是,在尝试编写以保存收藏时,我的收藏被丢弃了,也许您可​​以从那里进行研究。

    这里我直接使用dataframeWriter保存功能。您可以随意使用常规的 MongoSpark.save() 函数和 WriteConfig。

    我正在使用 Spark 3.1.2、Mongo-Spark 连接器 3.0.1、Mongo 4.2.6

    case class Item(id: Int, rank: Option[Int], value: String = "abc")
    
    def main(args: Array[String]): Unit = {
        val sparkSession = getSparkSession(args)
        val items = MongoSpark.load[Item](sparkSession, ReadConfig(Map("collection" -> "items"), Some(ReadConfig(sparkSession))))
        items.show()
        val dropped = items.drop("rank")
        dropped.write.option("collection", "items-updated").mode("overwrite").format("mongo").save()
        dropped.show()
      }
    

    【讨论】:

    • 这是一个有趣的想法,但重新创建集合对我来说不是一个选项,因为有很多字段我没有阅读,但我想保留在当前集合中('value' 字段是这里有一个例子)。集合大小约为 1 亿个文档,所以这不是我想重新创建的小集合。
    猜你喜欢
    • 2020-04-21
    • 2015-07-17
    • 1970-01-01
    • 1970-01-01
    • 2017-07-07
    • 1970-01-01
    • 2017-04-08
    • 2023-04-10
    • 2021-03-24
    相关资源
    最近更新 更多