【问题标题】:How to partition Spark RDD when importing Postgres using JDBC?使用 JDBC 导入 Postgres 时如何对 Spark RDD 进行分区?
【发布时间】:2017-01-28 14:34:20
【问题描述】:

我正在将 Postgres 数据库导入 Spark。我知道我可以在导入时进行分区,但这需要我有一个数字列(我不想使用 value 列,因为它到处都是并且不维护顺序):

df = spark.read.format('jdbc').options(url=url, dbtable='tableName', properties=properties).load()
df.printSchema()

root
 |-- id: string (nullable = false)
 |-- timestamp: timestamp (nullable = false)
 |-- key: string (nullable = false)
 |-- value: double (nullable = false)

相反,我将数据帧转换为 rdd(枚举元组)并尝试对其进行分区:

rdd = df.rdd.flatMap(lambda x: enumerate(x)).partitionBy(20)

请注意,我使用了20,因为我的集群中有 5 个工作人员,每个工作人员都有一个核心,而5*4=20

不幸的是,以下命令仍然需要很长时间才能执行:

result = rdd.first()

因此我想知道我上面的逻辑是否有意义?我做错什么了吗?从 Web GUI 来看,似乎没有使用工人:

【问题讨论】:

    标签: postgresql jdbc apache-spark pyspark rdd


    【解决方案1】:

    既然您已经知道可以按数字列进行分区,这可能是您应该做的。这是诀窍。首先让我们找到一个最小和最大 epoch:

    url = ...
    properties = ...
    
    min_max_query = """(
        SELECT
            CAST(min(extract(epoch FROM timestamp)) AS bigint), 
            CAST(max(extract(epoch FROM timestamp)) AS bigint)
        FROM tablename
    ) tmp"""
    
    min_epoch, max_epoch = spark.read.jdbc(
        url=url, table=min_max_query, properties=properties
    ).first()
    

    并使用它来查询表:

    numPartitions = ...
    
    query = """(
        SELECT *, CAST(extract(epoch FROM timestamp) AS bigint) AS epoch
        FROM tablename) AS tmp"""
    
    spark.read.jdbc(
        url=url, table=query,
        lowerBound=min_epoch, upperBound=max_epoch + 1, 
        column="epoch", numPartitions=numPartitions, properties=properties
    ).drop("epoch")
    

    由于这会将数据拆分为相同大小的范围,因此对数据倾斜相对敏感,因此您应谨慎使用。

    您还可以提供一个不相交的谓词列表作为predicates 参数。

    predicates= [
        "id BETWEEN 'a' AND 'c'",
        "id BETWEEN 'd' AND 'g'",
        ...   # Continue to get full coverage an desired number of predicates
    ]
    
    spark.read.jdbc(
        url=url, table="tablename", properties=properties, 
        predicates=predicates
    )
    

    后一种方法更加灵活,可以解决数据分布不均匀的某些问题,但需要更多关于数据的知识。

    使用partitionBy首先获取数据,然后执行完全洗牌以获得所需数量的分区,因此相对而言代价高昂。

    【讨论】:

    • 酷,我不知道我们有这些选项。但是,尽管rdd.getNumPartitions() 在按纪元分区时按预期返回了20,但我仍然停留在rdd.first() 步骤上并且没有使用从站。我应该对此提出另一个问题。谢谢
    • 第一种方法将数据划分为统一的大小范围。如果数据本身不是均匀分布的,这可能会导致偏差。第二种方法更灵活,但需要一些有关数据分布的知识。
    • 对于我的数据,按时间戳划分应该是相当统一的。有什么方法可以在不执行另一次详尽迭代的情况下获得每个分区的大小?我现在正在映射分区并计数,但这需要很长时间。
    • 您可以将范围拆分为统一的部分,并直接针对数据库提交每个范围的查询。
    • 真正有用的答案!
    猜你喜欢
    • 2022-10-13
    • 1970-01-01
    • 1970-01-01
    • 2014-08-31
    • 1970-01-01
    • 2017-08-02
    • 2017-05-03
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多