【问题标题】:How to find out items in each partition after repartition in Java Spark在Java Spark中重新分区后如何找出每个分区中的项目
【发布时间】:2020-06-12 23:21:55
【问题描述】:

我有一个包含几个整数值的 Java ArrayList。 我用 ArrayList 创建了一个 DataSet。 我使用 System.out.println(DF.javaRDD().getNumPartitions()); 并导致 1 个分区。 我想把数据分成3个分区。所以我使用了重新分区()。 我想在重新分区后找出每个分区中的项目数。

在 scala 中它是直截了当的。

DF.repartition(3).mapPartitions((it) => Iterator(it.length));

但相同的语法在 Java 中不起作用,因为长度函数在 Java 的迭代器接口中不可用。

我们应该如何解读mappartition函数?

mapPartitions(FlatMapFunction<java.util.Iterator<T>,U> f)

内部函数会接受哪些参数,返回类型是什么?

SparkSession sessn = SparkSession.builder().appName("RDD to DF").master("local").getOrCreate();
        List<Integer> lst = Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20);
        Dataset<Integer> DF = sessn.createDataset(lst, Encoders.INT());
        System.out.println(DF.javaRDD().getNumPartitions()); 

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    试试这个-

       List<Integer> lst = Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20);
            Dataset<Integer> DF = spark.createDataset(lst, Encoders.INT());
            System.out.println(DF.javaRDD().getNumPartitions());
            MapPartitionsFunction<Integer, Integer> f =
                    it -> ImmutableList.of(JavaConverters.asScalaIteratorConverter(it).asScala().length()).iterator();
            DF.repartition(3).mapPartitions(f,
                    Encoders.INT()).show(false);
            /**
             * 2
             * +-----+
             * |value|
             * +-----+
             * |6    |
             * |8    |
             * |6    |
             * +-----+
             */
    

    【讨论】:

    • ImmutableList.of(JavaConverters.asScalaIteratorConverter(it).asScala().length()).iterator() 您为 ImmutableList 导入了哪些包,其余的包? mapPartitions(FlatMapFunction,U> f) --> 你能解释一下内部函数的输入和内部函数的输出吗?
    • 对我来说似乎不起作用。在我们使用其中一些类和函数之前,我们是否需要包含来自 maven 存储库的 jar 和 jar?
    • 哪些类导致问题?
    • Dataset mappartDS = DF.repartition(3).mapPartitions(it-> Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator(), Encoders.INT( )); --> 我稍微调整了你的代码。 asScala() 在我的 Java 环境中由于某种原因不可用。这给了我一个数据集。我还写了将其转换为 javaRDD。 JavaRDD mappartRdd = DF.repartition(3).javaRDD().mapPartitions(it-> Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator()); --> 这样就创建了一个 RDD。如何在 JavaSpark 中将 RDD 转换为 DF?
    • Dataset&lt;Integer&gt; DF = spark.createDataset(rdd, Encoders.INT())
    猜你喜欢
    • 1970-01-01
    • 2021-12-21
    • 2019-11-01
    • 1970-01-01
    • 2019-03-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多