【问题标题】:How to use aggregate in the mongo query pipeline using spark mongo connector如何使用 spark mongo 连接器在 mongo 查询管道中使用聚合
【发布时间】:2021-08-25 13:31:28
【问题描述】:

我正在使用以下代码从 mongo 中获取数据。

pipeline = [{'$match': {'createdDateTime': {'$gte': {'$date': f'{yesterday}T00:00:00Z', '$lte': f'{today}T00:00:00Z'}}},
             {'$project': { '_class' :  {'$ifNull' : ['$_class','']}}}
             }
    ]

df= spark.read.format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.input.uri",uri).option("pipeline", pipeline).load()

我不明白这是怎么回事,我得到了以下异常。

IllegalArgumentException: requirement failed: Invalid Aggregation map Map(uri -> mongodb://xxxx:yyyy@mongo.com:27017/DBReport.Application, pipeline -> [{'$match': {'createdDateTime': {'$gte': {'$date': '2021-08-24T00:00:00Z', '$lte': '2021-08-25T00:00:00Z'}}}}, {'$project': {'_class': {'$ifNull': ['$_class', '']}}
    ]

请解释

【问题讨论】:

    标签: mongodb apache-spark pyspark


    【解决方案1】:

    也许你只是漏掉了一些括号。

    试试下面的代码,

    pipeline = [
                 {
                     '$match': {
                         'createdDateTime': {
                             '$gte': {'$date': f'{yesterday}T00:00:00Z'}, 
                             '$lte': {'$date': f'{today}T00:00:00Z'}
                         }
                     }
                 },
                 {
                     '$project': { 
                          '_class' :  {
                              '$ifNull' : ['$_class','']
                          }
                     }
                 }
    ]
    

    【讨论】:

      猜你喜欢
      • 2016-04-26
      • 1970-01-01
      • 2015-02-15
      • 2019-10-14
      • 2015-03-20
      • 1970-01-01
      • 2020-04-09
      • 1970-01-01
      • 2019-08-01
      相关资源
      最近更新 更多