【问题标题】:Duplicate key error when trying to write an $group aggregation to mongodb from Spark using scala尝试使用 scala 从 Spark 将 $group 聚合写入 mongodb 时出现重复键错误
【发布时间】:2017-05-25 15:40:14
【问题描述】:

编辑:此编辑可能会改变此问题的进程。

在 spark 上运行的 mongodb 聚合(特别是使用 $group)在写回集合时会创建重复的 _id 记录。结果,mongodb 抛出重复键错误。顺便说一句,这个查询在 mongo shell 中运行得非常好。

这就是我所做的:

我拿了一个小数据集,并将(聚合)火花代码的结果打印到控制台,而不是写入集合。我打印了完整的结果集,并在 _id 字段中发现了重复项。数据看起来像这样:(已编辑)

Document{{_id=Document{{prodCategory=123},{proId=ABC},{‌​location=US}}, details=[Document{{....}},Document{{....}},Document{{...}}, count=2223}}

Document{{_id=Document{{prodCategory=123},{proId=ABC},{locat‌​ion=US}}, details=[Document{{....}},Document{{....}},Document{{...}}, count=123}}.

有很多这样的重复文件。我不明白的是,为什么火花在将其写入集合之前没有整合完整的(地图??)工作?每个分区只是映射记录并将其直接写入集合。这不是它应该如何工作吗?

如果专家对如何解决此问题有任何建议,或者您从下面的原始帖子中看到我的代码中应更改的任何内容。请指教。

原帖:

我有以下收藏。

prodTransactions:
{
_id:
ProdCategory:
product:
location:
customer:
eventTime:
status:
}

我的汇总列出了状态为“完成”的 {ProdCategory-Product-location} 组的所有客户和日期。以下是mongodb代码。

db.prodTransactions.aggregate([
{$match: {status:'complete'}
, {$project: 
    {
        prodId:1,
        location:1,
        customer:1,
        status:1,
        eventTime:1,
        prodCategory:1
    }}
, {$group: 
    {
        _id: {prodCategory: "$prodCategory", lab: "$prodId", location: "$location"},
        details: {$addToSet: {customer: "$customer", date: {$dateToString: {format: "%m/%d/%Y", date: "$eventTime"}}, time: {$dateToString: {format: "%H:%M:%S", date: "$eventTime"}}}},
        count: {$sum: 1}
    }}
, {$out : "prodAgg"}
],{allowDiskUse: true}
)

当我直接在 mongodb 中运行它时,它运行完美,没有问题,并将所有数据保存到 prodAgg 集合中。聚合集合如下所示(数据已编辑):

{
    "_id" : {
        "prodCategory" : "category1",
        "prodId" : "prod1",
        "location" : "US-EAST"
    },
    "details" : [ 
        {
            "customer" : "xxx@yyy.com",
            "date" : "07/15/2016",
            "time" : "14:00:48"
        }, 
        {
            "customer" : "aaa@bbb.com",
            "date" : "07/15/2016",
            "time" : "19:05:48"
        }, 
        {
            "customer" : "ccc@ddd.com",
            "date" : "07/15/2016",
            "time" : "17:55:48"
        }, 
        {
            "customer" : "eee@fff.com",
            "date" : "07/15/2016",
            "time" : "19:20:49"
        }
    ],
    "count" : 4.0
}

问题是,如果我从 spark 执行此操作,尝试将其写入集合。它写入了一些文档,然后失败并出现以下异常(数据已编辑):

com.mongodb.MongoBulkWriteException: 批量写入操作出错 服务器 192.168.56.1:27017。写入错误:[BulkWriteError{index=6, code=11000, message='E11000 重复键错误采集: dbname.prodAgg 索引:id 复制键:{ : { prodCategory: "xxx", prodId: “yyyyy”,位置:“美国东部”} }',详细信息={ }}]。在 com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:176)

过去 3 天,这个错误一直困扰着我,我无法解决这个问题。

我的理解是(我可能错了,但是),组聚合本身不应该有任何重复,那么它如何/为什么会抛出重复键错误。还是‘我在聚合中做错了什么?还是scala代码?

如果那里的任何灵魂以前见过这种情况,请放出一些光,把我从这个漩涡中拉出来。我真的很感激

这是我的 scala 代码。我正在使用

mongodb-spark-connector

import org.apache.spark.{SparkConf, SparkContext}
import com.mongodb.spark._
import org.bson.Document
import com.mongodb.spark.config._

val conf = new SparkConf().setAppName("ProdAnalytics1").setMaster("local").
      set("spark.mongodb.input.uri","mongodb://192.168.56.1:27017/dbname.prodTransactions")
        .set("spark.mongodb.output.uri", "mongodb://192.168.56.1:27017/dbname.prodAgg")

      val sc = new SparkContext(conf)
      val rdd = sc.loadFromMongoDB()

      val aggRdd = rdd.withPipeline(Seq(
      Document.parse("{$match:{status:'end'}"),
      Document.parse("{$project: {prodId:1,location:1,customer:1,type:1,eventTime:1,prodCategory:1}}"),
      Document.parse("{$group: {_id: {prodCategory: \"$prodCategory\", prodId: \"$prodId\", location: \"$location\"},details: {$addToSet: {customer: \"$customer\", date: \"$eventTime\"}},count: {$sum: 1}}}"),
      Document.parse("{$sort: {count : -1}}, {allowDiskUse: true}")))
    println(aggRdd.count)

// Using the write Config to Write to DB
     val writeConf = WriteConfig(sc)
     val writeConfig = WriteConfig(Map("collection" -> "prodAgg", "db" -> "dbname"), Some(writeConf))
     MongoSpark.save(aggRdd, writeConfig)

我的 SBT 文件:

name := "Simple Project"
    version := "1.0"

    scalaVersion := "2.11.7"

    libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"

    //libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.6.1"

    libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.1"

    libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "1.1.0"

    libraryDependencies += "org.mongodb.scala" %% "mongo-scala-driver" % "1.2.1"

    resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
    resolvers += "snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/ "
    resolvers += "releases"  at "https://oss.sonatype.org/content/repositories/releases/"

注意:不使用最新版本的火花的原因是,在最新版本中它会抛出另一个异常:

线程“dag-scheduler-event-loop”中的异常 java.lang.NoClassDefFoundError: org/apache/spark/sql/DataFrame

在我的一生中,我无法理解这是什么,我什至没有使用数据框。所以……我就这样吧……如果有人对此有任何建议,我很乐意接受。

非常感谢任何建议...谢谢。

编辑:

这是 scala 代码运行时的 mongo 日志。这是失败前的最后一篇文章(已编辑

command dbname.ProdTransaction 命令:聚合 { 聚合: “ProdTransaction”,管道:[ { $match: { _id: { $gte: ObjectId('554c949ae4b0d28d51194caf'), $lt: ObjectId('55be257be4b0c3bd1c74e202') } } }, { $match: { $and: [ { 状态:“结束”},{位置:“美国”},{ prodId:{ $nin:[“abc”,“xxx”, "yyy" ] } } ] } }, { $project: { prodId: 1, location: 1, customer: 1, 状态:1,事件时间:1,产品类别:1 } },{$group:{_id:{实验室: “$prodId”,位置:“$location”},详细信息:{ $addToSet:{ prodCategory:“$prodCategory”,用户:“$customer”,日期:“$eventTime”} }, count: { $sum: 1 } } }, { $sort: { count: -1 } } ] cursor: {} } cursorid:258455518867 keyUpdates:0 writeConflicts:0 numYields:1335 reslen:4092243 锁:{ Global: { acquireCount: { r: 2694 } },
数据库:{acquireCount:{r:1347}},集合:{acquireCount:{ r: 1347 } } } 协议:op_query 1155ms

【问题讨论】:

  • 你应该回到db并检查是否有任何索引并删除prodAgg集合的索引并重新开始。
  • @SagarReddy:这是一个在运行时创建的新集合。所以这在执行之前是不存在的。
  • 我认为您的问题不在于您插入的数据。看起来数据库收集状态不干净。在数据库上运行 db.prodAgg.getIndexes() 会得到什么?
  • 这是我看到的(已编辑)。你看到的任何差异? MongoDB Enterprise > db.prodAgg.getIndexes() [ { "v" : 1, "key" : { "_id" : 1 }, "name" : "_id_", "ns" : "dbname.prodAgg" } ]
  • 这里有几点需要注意。您的 shell 输出与聚合代码中的不同。我认为系统不会分配任何 ID,因为您的分组键将是您的集合 ID。所以帮助我理解聚合代码是否将任何数据插入到集合中?您可能想要剥离所有代码并尝试基本聚合以查看问题所在。完全摆脱组,看看你是否可以运行聚合。

标签: mongodb scala apache-spark aggregation-framework


【解决方案1】:

这里有很多要回答的问题,所以我将回答分为三个部分;配置,为什么会发生以及如何解决它:

配置问题

线程“dag-scheduler-event-loop”java.lang.NoClassDefFoundError 中的异常:org/apache/spark/sql/DataFrame

当错误版本的 MongoDB Spark 连接器与 Spark 一起使用时会发生。如果您想使用 Spark 2.0.x,那么您需要 2.0.x MongoDB Spark 连接器,例如:

libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.0.0"

(如果您的 Scala 版本不同步,例如将 Scala 2.10 与为 2.11 编译的库一起使用,也会发生这些误导性错误)

关于安装不需要:libraryDependencies += "org.mongodb.scala" %% "mongo-scala-driver" % "1.2.1"。它是一个独立的库,有自己的 MongoClient、Codecs 等,因此与连接器一起使用时可能会导致错误。

为什么你会期望重复的 _ids 与 $group 聚合

Spark 的工作方式是对数据集合进行分区,然后跨 Spark 工作节点并行处理数据。

Mongo Spark 连接器中有各种分区器,它们都默认在 Document 的 _id 键上对集合进行分区。然后在每个分区上执行聚合。

因此,当您在集合的一个分区上运行多个聚合时,您可以合理地预期在生成的 RDD 中会产生重复的 _id。

修复重复的_ids

我可以想到三个可能的解决方法:

A) 鉴于聚合管道的性质,您应该使用 $out 运算符。这更有益,因为数据保留在 MongoDB 本地,您无需维护 Spark 集群。

B) 基于 Spark 的替代方案是在 RDD 上进行进一步处理以合并任何重复的 _id,然后再保存回 MongoDB。

C) 理论上,您可以提供自己的分区程序,该分区程序根据分组的 _id 字段返回分区。实际上,如果不要求分区查询使用性能不高的$in 过滤器,我想不出一个好方法。

【讨论】:

  • 我会试试这些,并会在这里更新我的答案。感谢您花时间详细解释。真的很感激。顺便说一句,我已经尝试过“A”并且它有效。这就是为什么我对为什么它在 Spark 中不起作用感到困惑的原因。但是你所说的(关于从业者)完全有道理。
  • 有没有办法告诉 spark 使用单个分区器?我用我的 conf 声明尝试了以下代码,但它不起作用。它仍在考虑相同数量的分区。 // .setExecutorEnv("spark.default.parallelism", "1").set("spark.default.parallelism", "1").
  • 这是我所做的:我拿了一个小数据集并尝试将 spark 代码的结果打印到控制台而不是写入集合中。我打印了完整的结果集,并在_id field 中找到了重复项。数据看起来像这样:Document{{_id=Document{{prodCategory=123},{proId=ABC},{location=US}}, details=[Document{{....}},Document{{....}},Document{{...}}, count=2223}} Document{{_id=Document{{prodCategory=123},{proId=ABC},{location=US}}, details=[Document{{....}},Document{{....}},Document{{...}}, count=123}}。此文档重复 wrt _id
  • 这样的重复文件很多。我不明白的是,为什么在将其写入集合之前,spark 不整合完整的地图作业(所谓的减少是什么?)?每个分区只是映射记录并将其直接写入集合。这不是它应该工作的方式吗?
  • 答:您可以使用测试分区器:com.mongodb.spark.rdd.partitioner.MongoSinglePartitioner 用于单个分区。但是,如果您实际上没有在 Spark 中做任何事情,则根本没有理由查看往返于 Spark 的数据——它只会增加延迟、复杂性和成本。 B:重复文件的原因已经说明。您没有对 Spark 中的 RDD 进行任何操作来对它们进行分组,那么 Spark 怎么知道您想要这样做呢?如果您使用多个分区,则需要在 Spark 中合并结果。如果您使用的是单个分区 - 为什么要使用 Spark?
【解决方案2】:

我可能弄错了,但由于您的聚合返回一个带有键 _id 的对象,因此 mongo 在插入时会尝试将其用作文档的 ID。不确定这是否是您想要的结果...如果不是,只需将 _id 键更改为其他值(id 甚至可以工作)

【讨论】:

  • 你是对的。它实际上是在尝试添加一个 id。从 mongo 日志中查看:{ $match: { _id: { $gte: ObjectId('554c949ae4b0d28d51194caf'), $lt: ObjectId('55be257be4b0c3bd1c74e202') } 加上我的聚合查询是 mongo 试图执行的。还有……
  • 我确实尝试了您提到的解决方案。像这样:{$group: { _id: null, prod: {prodCategory: "$prodCategory", prod: "$prodId", location: "$location"}, details: {$addToSet: {customer: "$customer", date: "$eventTime"}}, count: {$sum: 1} }} 我试图将 _id 设为 null 并将组的名称更改为其他名称。在 mongodb 中运行该查询不喜欢它。它抱怨unknown group operator prod
【解决方案3】:

我做了一些跟踪和错误并缩小了问题的范围(在 Sagar Reddy 的建议的帮助下)。这是我发现的。

如果代码中没有 $group 聚合,聚合将起作用。只需$match, $project, $sort..任何命令组合都可以正常工作。一旦我输入$group,即使只有一个参数,它也会失败。

我认为是,$group 是唯一将 new _id 添加到集合的聚合。没有其他聚合命令会添加 "new" _id。这就是问题所在,这就是问题所在。

我的问题是我的聚合需要$group,如果没有它就无济于事。

如果你们中的任何人对此有解决方案/信息或解决方法,请告知。

非常感谢。

【讨论】:

    【解决方案4】:

    我一直在调查您的 MongoDB 问题,以下是我的一些想法:

    1. 我注意到您的聚合查询中有一个错误 - “{$match:{status:'end'}”中缺少大括号是故意的吗?它应该以双括号结尾 - “{$match:{status:'end'}}”。我在我的错误重现代码中更改了它。
    2. IMO 您的聚合查询不会产生重复的键 - 一旦您运行 Spark 应用程序,就会创建 prodAgg 集合并填充聚合结果,然后如果您第二次运行它,它将产生相同的键,但不会删除旧集合第一的。这是你的问题的原因。

    在 spark 聚合之前添加以下行:

    val mongoClient = new MongoClient() 
    val db = mongoClient.getDatabase("dbname")
    db.getCollection("prodAgg").drop()
    

    如果代码中没有 $group 聚合,聚合就会起作用。只需 $match、$project、$sort..任何命令组合都可以正常工作。一旦我放入 $group,即使只有一个参数,它也会失败。

    我无法重现该行为,但如果我将文档的 _id 排除添加到 $project 运算符,它会按照您所说的那样工作。在那种情况下,我有一个解释 - 当没有源文档的 _id 并且您没有重复时,聚合会创建一个具有新 _id 的新文档。如果不排除投影中的 _id,它应该从源文档继承,然后应该发生重复。

    【讨论】:

    • 谢谢卢卡斯。几天后我会尝试你的所有建议。目前消防另一个问题。非常感谢您的建议。将对此进行更新。谢谢。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-07-12
    • 2022-01-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-06-07
    相关资源
    最近更新 更多