【问题标题】:Cogroup on Spark DataFramesSpark DataFrames 上的 Cogroup
【发布时间】:2019-11-28 15:10:32
【问题描述】:

我有 2 个要根据关联键合并的大型 DataFrame。使用join 需要更长的时间才能完成任务。

我发现在 Apache Spark 中使用 cogroup 优于 Joins。任何人都可以指出如何在 DataFrames 上使用 cogroup 或建议一种更好的方法来合并 2 个大型 DataFrames。

谢谢

【问题讨论】:

    标签: apache-spark dataframe apache-spark-sql


    【解决方案1】:

    火花 >= 3.0

    自 3.0 起,Spark 使用 Pandas / Arrow 提供 PySpark 特定的cogroup。一般语法如下:

    left.cogroup(right).apply(f)
    

    其中bothrightGroupedData 对象,f 是一个COGROUPED_MAP 用户定义函数,它接受两个Pandas DataFrames 并返回Pandas DataFrame

    from pyspark.sql.functions import pandas_udf, PandasUDFType
    from pandas.core.frame import DataFrame as PandasDataFrame
    
    @pandas_udf(schema)
    def f(left: PandasDataFrame, right: PandasDataFrame) -> PandasDataFrame: ...
    

    火花 >= 1.6

    JVM KeyValueGroupedDataset 同时提供 Java

    def cogroup[U, R](other: KeyValueGroupedDataset[K, U], f: CoGroupFunction[K, V, U, R], encoder: Encoder[R]): Dataset[R] 
    

    和斯卡拉

    def cogroup[U, R](other: KeyValueGroupedDataset[K, U])(f: (K, Iterator[V], Iterator[U]) ⇒ TraversableOnce[R])(implicit arg0: Encoder[R]): Dataset[R] 
    

    然而,它适用于“强”类型的变体,而不是 Dataset[Row],并且是 highly unlikely 为您声明的目标(性能改进)做出贡献。

    Spark (这部分继续有效,除了上面列出的小 API 添加)。

    DataFrame 不提供任何与cogroup 等效的函数,并且复杂对象不是 Spark SQL 中的一等公民。复杂结构上可用的一组操作相当有限,因此通常您必须创建不简单的自定义表达式或使用 UDF 并支付性能损失。此外,Spark SQL 没有使用与普通 RDDs 相同的 join 逻辑。

    关于 RDD。虽然存在cogroup 可能优于join 的边界情况,但通常情况并非如此,除非结果-> 完整数据集的笛卡尔积。在 RDD 上的所有连接都使用 cogroup 后跟 flatMapValues 表示后,由于后一个操作是本地操作,唯一真正的开销是创建输出元组。

    如果您的表仅包含原始类型,您可以通过首先使用 collect_list 聚合列来模仿类似共同组的行为,但我不希望这里有任何性能提升。

    【讨论】:

    • 在很多情况下cogroup 优于joinCogroup 是一个具有groupBy 语义的连接运算符,将数据放在上下文中非常有用。复杂对象在 Spark 中还不是一等公民,但它们在 impala 中通过消除读取时的连接需求而产生了巨大的性能优势。
    • @jwinandy 你能详细说明一下,因为我信任 zero323 cmets。
    猜你喜欢
    • 2016-04-26
    • 1970-01-01
    • 1970-01-01
    • 2021-11-05
    • 1970-01-01
    • 2017-10-13
    • 1970-01-01
    • 2015-12-11
    • 2016-04-11
    相关资源
    最近更新 更多