【发布时间】:2020-11-11 12:14:03
【问题描述】:
假设我有一个这样的员工表:
| employee_id | employee_name | department | created_at | updated_at |
|-------------|---------------|------------|---------------------|---------------------|
| 1 | Jessica | Finance | 2020-10-10 12:00:00 | 2020-10-10 12:00:00 |
| 2 | Michael | IT | 2020-10-10 15:00:00 | 2020-10-10 15:00:00 |
| 3 | Sheila | HR | 2020-10-11 17:00:00 | 2020-10-11 17:00:00 |
| ... | ... | ... | ... | ... |
| 1000 | Emily | IT | 2020-10-20 20:00:00 | 2020-10-20 20:00:00 |
通常,我可以使用 JDBC 连接在 Pyspark 中批量处理数据,然后像这样写入 GCS:
df = spark.read.format("jdbc") \
.option("url", "jdbc:postgresql://{ip_address}/{database}") \
.option("dbtable", table_source) \
.option("user", user_source) \
.option("password", password_source) \
.option("driver", "org.postgresql.Driver") \
.load()
df.write.parquet("gs://{bucket_name}/{target_directory}/")
当我像上面的代码一样创建 df 并使用 .load() 时,数据是否仍在数据库服务器中或 spark 从表中下载所有数据并将其移动到 spark 集群(假设数据库和 Spark 集群放在不同的服务器上)。
如果我需要在时间范围内获取特定数据,假设我需要 created_at > 2020-10-15 00:00:00 的数据
下面的代码够吗?因为我发现当数据大小超过 25 GB 时它真的很慢
df = spark.read.format("jdbc") \
.option("url", "jdbc:postgresql://{ip_address}/{database}") \
.option("dbtable", table_source) \
.option("user", user_source) \
.option("password", password_source) \
.option("driver", "org.postgresql.Driver") \
.load()
df.createOrReplaceTempView("get_specific_data")
get_specific_data = spark.sql('''
SELECT employee_id, employee_name, department, created_at, updated_at
FROM get_specific_data
WHERE created_at > '2020-10-15 00:00:00'
'''
get_specific_data.write.parquet("gs://{bucket_name}/{target_directory}/")
如果我知道需要按 created_date 列(或任何其他列、ID 或其他内容)检索哪些数据,我的问题更像是如何有效地获取 Pyspark 中的特定数据。我需要spark sql吗?或使用其他工具? (为了每天对数据进行批处理)
【问题讨论】:
标签: apache-spark pyspark