【发布时间】:2017-05-25 15:40:14
【问题描述】:
编辑:此编辑可能会改变此问题的进程。
在 spark 上运行的 mongodb 聚合(特别是使用 $group)在写回集合时会创建重复的 _id 记录。结果,mongodb 抛出重复键错误。顺便说一句,这个查询在 mongo shell 中运行得非常好。
这就是我所做的:
我拿了一个小数据集,并将(聚合)火花代码的结果打印到控制台,而不是写入集合。我打印了完整的结果集,并在 _id 字段中发现了重复项。数据看起来像这样:(已编辑)
Document{{_id=Document{{prodCategory=123},{proId=ABC},{location=US}}, details=[Document{{....}},Document{{....}},Document{{...}}, count=2223}}
Document{{_id=Document{{prodCategory=123},{proId=ABC},{location=US}}, details=[Document{{....}},Document{{....}},Document{{...}}, count=123}}.
有很多这样的重复文件。我不明白的是,为什么火花在将其写入集合之前没有整合完整的(地图??)工作?每个分区只是映射记录并将其直接写入集合。这不是它应该如何工作吗?
如果专家对如何解决此问题有任何建议,或者您从下面的原始帖子中看到我的代码中应更改的任何内容。请指教。
原帖:
我有以下收藏。
prodTransactions:
{
_id:
ProdCategory:
product:
location:
customer:
eventTime:
status:
}
我的汇总列出了状态为“完成”的 {ProdCategory-Product-location} 组的所有客户和日期。以下是mongodb代码。
db.prodTransactions.aggregate([
{$match: {status:'complete'}
, {$project:
{
prodId:1,
location:1,
customer:1,
status:1,
eventTime:1,
prodCategory:1
}}
, {$group:
{
_id: {prodCategory: "$prodCategory", lab: "$prodId", location: "$location"},
details: {$addToSet: {customer: "$customer", date: {$dateToString: {format: "%m/%d/%Y", date: "$eventTime"}}, time: {$dateToString: {format: "%H:%M:%S", date: "$eventTime"}}}},
count: {$sum: 1}
}}
, {$out : "prodAgg"}
],{allowDiskUse: true}
)
当我直接在 mongodb 中运行它时,它运行完美,没有问题,并将所有数据保存到 prodAgg 集合中。聚合集合如下所示(数据已编辑):
{
"_id" : {
"prodCategory" : "category1",
"prodId" : "prod1",
"location" : "US-EAST"
},
"details" : [
{
"customer" : "xxx@yyy.com",
"date" : "07/15/2016",
"time" : "14:00:48"
},
{
"customer" : "aaa@bbb.com",
"date" : "07/15/2016",
"time" : "19:05:48"
},
{
"customer" : "ccc@ddd.com",
"date" : "07/15/2016",
"time" : "17:55:48"
},
{
"customer" : "eee@fff.com",
"date" : "07/15/2016",
"time" : "19:20:49"
}
],
"count" : 4.0
}
问题是,如果我从 spark 执行此操作,尝试将其写入集合。它写入了一些文档,然后失败并出现以下异常(数据已编辑):
com.mongodb.MongoBulkWriteException: 批量写入操作出错 服务器 192.168.56.1:27017。写入错误:[BulkWriteError{index=6, code=11000, message='E11000 重复键错误采集: dbname.prodAgg 索引:id 复制键:{ : { prodCategory: "xxx", prodId: “yyyyy”,位置:“美国东部”} }',详细信息={ }}]。在 com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:176)
过去 3 天,这个错误一直困扰着我,我无法解决这个问题。
我的理解是(我可能错了,但是),组聚合本身不应该有任何重复,那么它如何/为什么会抛出重复键错误。还是‘我在聚合中做错了什么?还是scala代码?
如果那里的任何灵魂以前见过这种情况,请放出一些光,把我从这个漩涡中拉出来。我真的很感激
这是我的 scala 代码。我正在使用
mongodb-spark-connector
import org.apache.spark.{SparkConf, SparkContext}
import com.mongodb.spark._
import org.bson.Document
import com.mongodb.spark.config._
val conf = new SparkConf().setAppName("ProdAnalytics1").setMaster("local").
set("spark.mongodb.input.uri","mongodb://192.168.56.1:27017/dbname.prodTransactions")
.set("spark.mongodb.output.uri", "mongodb://192.168.56.1:27017/dbname.prodAgg")
val sc = new SparkContext(conf)
val rdd = sc.loadFromMongoDB()
val aggRdd = rdd.withPipeline(Seq(
Document.parse("{$match:{status:'end'}"),
Document.parse("{$project: {prodId:1,location:1,customer:1,type:1,eventTime:1,prodCategory:1}}"),
Document.parse("{$group: {_id: {prodCategory: \"$prodCategory\", prodId: \"$prodId\", location: \"$location\"},details: {$addToSet: {customer: \"$customer\", date: \"$eventTime\"}},count: {$sum: 1}}}"),
Document.parse("{$sort: {count : -1}}, {allowDiskUse: true}")))
println(aggRdd.count)
// Using the write Config to Write to DB
val writeConf = WriteConfig(sc)
val writeConfig = WriteConfig(Map("collection" -> "prodAgg", "db" -> "dbname"), Some(writeConf))
MongoSpark.save(aggRdd, writeConfig)
我的 SBT 文件:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"
//libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.6.1"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.1"
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "1.1.0"
libraryDependencies += "org.mongodb.scala" %% "mongo-scala-driver" % "1.2.1"
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
resolvers += "snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/ "
resolvers += "releases" at "https://oss.sonatype.org/content/repositories/releases/"
注意:不使用最新版本的火花的原因是,在最新版本中它会抛出另一个异常:
线程“dag-scheduler-event-loop”中的异常 java.lang.NoClassDefFoundError: org/apache/spark/sql/DataFrame
在我的一生中,我无法理解这是什么,我什至没有使用数据框。所以……我就这样吧……如果有人对此有任何建议,我很乐意接受。
非常感谢任何建议...谢谢。
编辑:
这是 scala 代码运行时的 mongo 日志。这是失败前的最后一篇文章(已编辑)
command dbname.ProdTransaction 命令:聚合 { 聚合: “ProdTransaction”,管道:[ { $match: { _id: { $gte: ObjectId('554c949ae4b0d28d51194caf'), $lt: ObjectId('55be257be4b0c3bd1c74e202') } } }, { $match: { $and: [ { 状态:“结束”},{位置:“美国”},{ prodId:{ $nin:[“abc”,“xxx”, "yyy" ] } } ] } }, { $project: { prodId: 1, location: 1, customer: 1, 状态:1,事件时间:1,产品类别:1 } },{$group:{_id:{实验室: “$prodId”,位置:“$location”},详细信息:{ $addToSet:{ prodCategory:“$prodCategory”,用户:“$customer”,日期:“$eventTime”} }, count: { $sum: 1 } } }, { $sort: { count: -1 } } ] cursor: {} } cursorid:258455518867 keyUpdates:0 writeConflicts:0 numYields:1335 reslen:4092243 锁:{ Global: { acquireCount: { r: 2694 } },
数据库:{acquireCount:{r:1347}},集合:{acquireCount:{ r: 1347 } } } 协议:op_query 1155ms
【问题讨论】:
-
你应该回到db并检查是否有任何索引并删除
prodAgg集合的索引并重新开始。 -
@SagarReddy:这是一个在运行时创建的新集合。所以这在执行之前是不存在的。
-
我认为您的问题不在于您插入的数据。看起来数据库收集状态不干净。在数据库上运行
db.prodAgg.getIndexes()会得到什么? -
这是我看到的(已编辑)。你看到的任何差异?
MongoDB Enterprise > db.prodAgg.getIndexes() [ { "v" : 1, "key" : { "_id" : 1 }, "name" : "_id_", "ns" : "dbname.prodAgg" } ] -
这里有几点需要注意。您的 shell 输出与聚合代码中的不同。我认为系统不会分配任何 ID,因为您的分组键将是您的集合 ID。所以帮助我理解聚合代码是否将任何数据插入到集合中?您可能想要剥离所有代码并尝试基本聚合以查看问题所在。完全摆脱组,看看你是否可以运行聚合。
标签: mongodb scala apache-spark aggregation-framework