【问题标题】:Converting Dataframe to RDD reduces partitions将 Dataframe 转换为 RDD 减少了分区
【发布时间】:2016-12-23 12:04:17
【问题描述】:

在我们的代码中,Dataframe 被创建为:

DataFrame DF = hiveContext.sql("select * from table_instance");

当我将数据帧转换为 rdd 并尝试获取其分区数时

RDD<Row> newRDD = Df.rdd();
System.out.println(newRDD.getNumPartitions());

它将分区的数量减少到 1(1 打印在控制台中)。最初我的数据框有 102 个分区。

更新:

在阅读时,我重新分配了数据框:

DataFrame DF = hiveContext.sql("select * from table_instance").repartition(200);

然后转换为 rdd ,所以它只给了我 200 个分区。 有没有

JavaSparkContext

在这方面有什么作用吗?当我们将数据帧转换为 rdd 时,默认最小分区标志是否也在 spark 上下文级别考虑?

更新:

我制作了一个单独的示例程序,其中我将完全相同的表读入数据帧并转换为 rdd。没有为 RDD 转换创建额外的阶段,并且分区计数也是正确的。我现在想知道我在主程序中做了什么不同。

如果我的理解有误,请告诉我。

【问题讨论】:

    标签: apache-spark spark-dataframe


    【解决方案1】:

    它基本上取决于hiveContext.sql()的实现。由于我是 Hive 新手,我的猜测是 hiveContext.sql 不知道 OR 无法拆分表中存在的数据。

    例如,当您从 HDFS 读取文本文件时,spark 上下文会考虑该文件使用的块数来确定分区。

    您对repartition 所做的操作是解决此类问题的明显方法。(注意:如果不使用适当的分区器,重新分区可能会导致随机操作,默认使用哈希分区器)

    引起您的怀疑,hiveContext 可能会考虑默认的最小分区属性。但是,依赖默认属性不会 解决你所有的问题。例如,如果您的 hive 表的大小增加,您的程序仍然使用默认的分区数。

    更新:重新分区期间避免随机播放

    定义您的自定义分区器:

    public class MyPartitioner extends HashPartitioner {
        private final int partitions;
        public MyPartitioner(int partitions) {
            super();
            this.partitions = partitions;
        }
        @Override
        public int numPartitions() {
            return this.partitions;
        }
    
        @Override
        public int getPartition(Object key) {
            if (key instanceof String) {
                return super.getPartition(key);
            } else if (key instanceof Integer) {
                return (Integer.valueOf(key.toString()) % this.partitions);
            } else if (key instanceof Long) {
                return (int)(Long.valueOf(key.toString()) % this.partitions);
            }
            //TOD ... add more types
        }
    }
    

    使用您的自定义分区器:

    JavaPairRDD<Long, SparkDatoinDoc> pairRdd = hiveContext.sql("select *   from table_instance")
    .mapToPair( //TODO ... expose the column as key)
    
    rdd = rdd.partitionBy(new MyPartitioner(200));
    //... rest of processing
    

    【讨论】:

    • 感谢@code 的回复。我一直在这个问题上停留了一段时间。来自 hivecontext.sql() 的分区被读取为 102。我对数据帧启动了计数操作,并了解到有 102 个任务正在启动,因此有 102 个分区。现在,当我进行重新分区时,会引起很多洗牌。我想根据某些列进行重新分区,请提出一种可以进行最小洗牌的重新分区技术
    • 您能解释一下它是什么类型的列以及它的值范围吗?
    • 这基本上是由用户决定的。可以是 string , long , bigint 任何数据类型的列都可以存在
    • 不能像这样重新分区数据框。其次 hivecontext.sql 返回一个数据框而不是一对 rdd 。为了使用它,我必须将数据帧转换为 rdd,然后在这个 rdd 上调用这个自定义分区器,并使用一个分区,从而启动一个任务。它会再次给我带来内存问题。
    • 由于我是hive的新手,所以我评论了将dataframe转换为rdd的部分。此外,一旦将数据框转换为 rdd,您需要在其上运行 mapToPair() 并选择任何列作为键,整行作为值。根据我的理解,上面的代码不会出现内存问题,因为 spark 可以管道分区过程。请运行代码并确认。
    猜你喜欢
    • 2020-01-24
    • 1970-01-01
    • 1970-01-01
    • 2016-05-29
    • 2021-10-06
    • 1970-01-01
    • 2017-06-13
    • 2021-06-29
    • 2023-03-13
    相关资源
    最近更新 更多