火花 >= 3.0
自 3.0 起,Spark 使用 Pandas / Arrow 提供 PySpark 特定的cogroup。一般语法如下:
left.cogroup(right).apply(f)
其中both 和right 是GroupedData 对象,f 是一个COGROUPED_MAP 用户定义函数,它接受两个Pandas DataFrames 并返回Pandas DataFrame
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pandas.core.frame import DataFrame as PandasDataFrame
@pandas_udf(schema)
def f(left: PandasDataFrame, right: PandasDataFrame) -> PandasDataFrame: ...
火花 >= 1.6
JVM KeyValueGroupedDataset 同时提供 Java
def cogroup[U, R](other: KeyValueGroupedDataset[K, U], f: CoGroupFunction[K, V, U, R], encoder: Encoder[R]): Dataset[R]
和斯卡拉
def cogroup[U, R](other: KeyValueGroupedDataset[K, U])(f: (K, Iterator[V], Iterator[U]) ⇒ TraversableOnce[R])(implicit arg0: Encoder[R]): Dataset[R]
然而,它适用于“强”类型的变体,而不是 Dataset[Row],并且是 highly unlikely 为您声明的目标(性能改进)做出贡献。
Spark (这部分继续有效,除了上面列出的小 API 添加)。
DataFrame 不提供任何与cogroup 等效的函数,并且复杂对象不是 Spark SQL 中的一等公民。复杂结构上可用的一组操作相当有限,因此通常您必须创建不简单的自定义表达式或使用 UDF 并支付性能损失。此外,Spark SQL 没有使用与普通 RDDs 相同的 join 逻辑。
关于 RDD。虽然存在cogroup 可能优于join 的边界情况,但通常情况并非如此,除非结果-> 完整数据集的笛卡尔积。在 RDD 上的所有连接都使用 cogroup 后跟 flatMapValues 表示后,由于后一个操作是本地操作,唯一真正的开销是创建输出元组。
如果您的表仅包含原始类型,您可以通过首先使用 collect_list 聚合列来模仿类似共同组的行为,但我不希望这里有任何性能提升。