【问题标题】:Spark group by Key and partitioning the dataSpark 按 Key 分组并对数据进行分区
【发布时间】:2021-02-05 21:03:27
【问题描述】:

我有一个包含以下格式数据的大型 csv 文件。

cityId1,name,address,.......,zip

cityId2,name,address,.......,zip

cityId1,name,address,.......,zip

........

cityIdN,name,address,.......,zip

我正在对上述 csv 文件执行以下操作:

  1. 按cityId作为key,资源列表作为value分组

    df1.groupBy($"cityId").agg(collect_list(struct(cols.head, cols.tail: _*)) as "resources")

  2. 改成jsonRDD

    val jsonDataRdd2 = df2.toJSON.rdd

  3. 遍历每个分区并按密钥上传到 s3

  • 由于业务逻辑限制(其他服务如何从 S3 读取),我无法使用 dataframe partitionby write

我的问题:

  • spark 分区的默认大小是多少?
  • 假设默认分区大小为 X MB,并且 dataFrame 中存在一条大记录,其键具有 Y MB 数据 (Y > X),在这种情况下会发生什么情况?
  • 在这种情况下,我是否需要担心在不同分区中拥有相同的密钥?

【问题讨论】:

标签: apache-spark apache-spark-sql


【解决方案1】:

回答您的问题:

  • 从二级存储(S3、HDFS)读取时,分区等于文件系统的块大小,128MB 或 256MB;但您可以立即重新分区 RDD,而不是数据帧。 (对于 JDBC 和 Spark Structured Streaming,分区的大小是动态的。)

  • 当应用“宽转换”并重新分区时,分区的数量和大小最有可能发生变化。给定分区的大小具有最大值。在 Spark 2.4.x 中,分区大小增加到 8GB。因此,如果任何转换(例如 collect_list 与 groupBy 的组合)生成超过此最大大小,您将收到错误并且程序中止。因此,您需要明智地进行分区,或者在您的情况下有足够数量的分区进行聚合 - 请参阅 spark.sql.shuffle.partitions 参数。

  • Spark 处理的并行模型依赖于通过散列、范围分区等分配的“键”分配到一个且只有一个分区 - 洗牌。所以,遍历一个分区foreachPartition,mapPartitions没有问题。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-03-13
    • 2019-12-17
    相关资源
    最近更新 更多