【问题标题】:How to efficiently select partial data from RDBMS tables in Pyspark如何有效地从 Pyspark 中的 RDBMS 表中选择部分数据
【发布时间】:2020-11-11 12:14:03
【问题描述】:

假设我有一个这样的员工表:

| employee_id | employee_name | department | created_at          | updated_at          |
|-------------|---------------|------------|---------------------|---------------------|
| 1           | Jessica       | Finance    | 2020-10-10 12:00:00 | 2020-10-10 12:00:00 |
| 2           | Michael       | IT         | 2020-10-10 15:00:00 | 2020-10-10 15:00:00 |
| 3           | Sheila        | HR         | 2020-10-11 17:00:00 | 2020-10-11 17:00:00 |
| ...         | ...           | ...        | ...                 | ...                 |
| 1000        | Emily         | IT         | 2020-10-20 20:00:00 | 2020-10-20 20:00:00 |

通常,我可以使用 JDBC 连接在 Pyspark 中批量处理数据,然后像这样写入 GCS:

df = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://{ip_address}/{database}") \
    .option("dbtable", table_source) \
    .option("user", user_source) \
    .option("password", password_source) \
    .option("driver", "org.postgresql.Driver") \
    .load()

df.write.parquet("gs://{bucket_name}/{target_directory}/")

当我像上面的代码一样创建 df 并使用 .load() 时,数据是否仍在数据库服务器中或 spark 从表中下载所有数据并将其移动到 spark 集群(假设数据库和 Spark 集群放在不同的服务器上)。

如果我需要在时间范围内获取特定数据,假设我需要 created_at > 2020-10-15 00:00:00 的数据

下面的代码够吗?因为我发现当数据大小超过 25 GB 时它真的很慢

df = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://{ip_address}/{database}") \
    .option("dbtable", table_source) \
    .option("user", user_source) \
    .option("password", password_source) \
    .option("driver", "org.postgresql.Driver") \
    .load()

df.createOrReplaceTempView("get_specific_data")

get_specific_data = spark.sql('''
                        SELECT employee_id, employee_name, department, created_at, updated_at
                        FROM get_specific_data 
                        WHERE created_at > '2020-10-15 00:00:00'
                        '''

get_specific_data.write.parquet("gs://{bucket_name}/{target_directory}/")

如果我知道需要按 created_date 列(或任何其他列、ID 或其他内容)检索哪些数据,我的问题更像是如何有效地获取 Pyspark 中的特定数据。我需要spark sql吗?或使用其他工具? (为了每天对数据进行批处理)

【问题讨论】:

    标签: apache-spark pyspark


    【解决方案1】:

    事实证明,如果我在 table_source 中仅指定表名,它会将所有数据加载到 spark 集群中。

    要选择我需要的特定数据,我可以使用这样的东西:

    last_update = '2020-10-15 00:00:00'
    table_source = "(SELECT employee_id, employee_name, department, created_at, updated_at " \
                   "FROM get_specific_data WHERE created_at > '{0}') t1".format(last_update)
    
    df = spark.read.format("jdbc") \
        .option("url", "jdbc:postgresql://{ip_address}/{database}") \
        .option("dbtable", table_source) \
        .option("user", user_source) \
        .option("password", password_source) \
        .option("driver", "org.postgresql.Driver") \
        .load()
    
    # And then write the data to parquet file
    get_specific_data.write.parquet("gs://{bucket_name}/{target_directory}/")
    

    【讨论】:

    • 其实是option("query", table_source)
    猜你喜欢
    • 2015-11-16
    • 1970-01-01
    • 1970-01-01
    • 2017-04-08
    • 1970-01-01
    • 2021-09-23
    • 2013-10-25
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多