【问题标题】:How to extract only specific rows from mongodb using Pyspark?如何使用 Pyspark 从 mongodb 中仅提取特定行?
【发布时间】:2021-02-18 12:52:08
【问题描述】:

我正在从 mongodb 集合中提取数据并使用 Spark python 代码将其写入 bigquery 表。

下面是我的代码 sn-p:

df = spark.read\
    .format("com.mongodb.spark.sql.DefaultSource")\
    .option("uri","mongodb_url")\
    .option("database","db_name")\
    .option("collection", "collection_name")\
    .load()

df.write \
  .format("bigquery") \
  .mode("append")\
  .option("temporaryGcsBucket","gcs_bucket") \
  .option("createDisposition","CREATE_IF_NEEDED")\
  .save("bq_dataset_name.collection_name")

这将从 mongodb 集合中提取所有数据。但我只想提取满足条件的文档(如 sql 查询中的 where 条件)。

我发现的一种方法是读取数据帧中的整个数据并在该数据帧上使用过滤器,如下所示:

df2 = df.filter(df['date'] < '12-03-2020 10:12:40')

但由于我的源 mongo 集合有 8-10 Gb 的数据,我不能每次都从 mongo 读取整个数据。

如何在使用 spark.read 从 mongo 读取数据时使用过滤?

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql spark-streaming


    【解决方案1】:

    您是否尝试过在应用过滤器后检查您的整个数据是否正在被扫描?

    假设您使用带有 spark 的 official connector,则支持过滤器/谓词下推。

    “谓词下推”是对连接器和 Catalyst 优化器自动将谓词“下推”到数据 节点。目标是最大化过滤掉的数据量 在将数据加载到 Spark 的节点内存之前的数据存储端。

    连接器自动下推到 MongoDB 的谓词有两种:

    • select 子句(投影)作为$project
    • filter子句内容(where)作为一个或多个$match

    你可以找到这个over here的支持代码。

    注意: 关于嵌套字段的谓词下推存在一些问题,但这是 spark 本身的一个错误,也会影响其他来源。这已在 Spark 3.x 中修复。检查this答案。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-01-23
      • 2011-03-05
      相关资源
      最近更新 更多