【问题标题】:How to perform group and count using map and reduce function in Spark 2.3.1如何在 Spark 2.3.1 中使用 map 和 reduce 函数进行分组和计数
【发布时间】:2019-01-21 19:14:16
【问题描述】:

我是一个新的火花,我正在尝试使用以下火花函数执行分组和计数:

 Dataset<Row> result =  dataset
       .groupBy("column1", "column2")
       .count();

但我读到here 说使用 group by 不是一个好主意,因为它没有组合器,这反过来会影响 spark 作业的运行时效率。 相反,应该使用 reduceByKey 函数进行聚合操作。

所以我尝试使用reduceByKey 函数,但它不适用于dataset。相反,数据集使用reduce(ReduceFunction&lt;Row&gt; func)

由于找不到使用reduce函数执行分组和计数的示例,我尝试将其转换为JavaRDD并使用reduceByKey

//map each row to 1 and then group them by key 
JavaPairRDD<String[], Integer> mapOnes;
            try {
                 mapOnes = dailySummary.javaRDD().mapToPair(
                        new PairFunction<Row, String[], Integer>() {
                            @Override
                            public Tuple2<String[], Integer> call(Row t) throws Exception {
                                return new Tuple2<String[], Integer>(new String[]{t.getAs("column1"), t.getAs("column2")}, 1);
                            }   
                });
            }catch(Exception e) {
                log.error("exception in mapping ones: "+e);
                throw new Exception();
            }


        JavaPairRDD<String[], Integer> rowCount;
        try {
            rowCount = mapOnes.reduceByKey(
                new Function2<Integer, Integer, Integer>() {

                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1+v2;
                    }
                });
        }catch(Exception e) {
            log.error("exception in reduce by key: "+e);
            throw new Exception();
        }

但这也给了org.apache.spark.SparkException: Task not serializablemapToPair函数的异常。

谁能建议一种更好的方法来使用数据集的reducemap 函数进行分组和执行计数。

感谢任何帮助。

【问题讨论】:

  • 如果我能给你一个小费,请使用 SCALA。 Java 不在它所在的位置,除了可能是 KAFKA。
  • “但我在这里读到,使用 group by 不是一个好主意,因为它没有组合器” - DataFrame / Dataset groupBy behaviour/optimization

标签: java apache-spark mapreduce dataset


【解决方案1】:

您添加的链接中的 groupBy 指的是 RDD。在 RDD 语义中,groupBy 基本上会根据 key 打乱所有数据,即将与 key 相关的所有值带到一个地方。

这就是为什么建议使用 reduceByKey 的原因,因为 reduceByKey 首先在每个分区上执行 reduce 操作,并且只有减少后的值被打乱,这意味着更少的流量(并防止将所有内容都放到一个分区时出现内存不足的问题)。

在数据集中,groupBy 的行为不同。它不会将数据集作为返回的对象,而是提供 KeyValueGroupedDataset 对象。当您确实依赖此对象(或更通用的 agg)时,它基本上定义了一个与 reduceByKey 非常相似的 reducer。

这意味着不需要单独的reduceByKey方法(数据集groupby实际上是reduceByKey的一种形式)。

坚持原来的groupBy(...).count(...)

【讨论】:

  • 但是有没有其他比 group by 更有效的分组方式?
  • 取决于您的需要。对于几乎所有的聚合,这将是最有效的方式。我会说使用数据框语义而不是数据集语义通常可以提高性能。在您的情况下,您在幕后使用数据框语义,因为您按列而不是某些映射函数进行分组。
【解决方案2】:

基于包含 2 列的数据集,其中一列是县名,另一列是美国的州名。

期望的输出:

reduce()
Autauga County, Alabama, Baldwin County, Alabama, Barbour County, Alabama, Bibb County, Alabama, Blount County, Alabama, Bullock County, Alabama, Butler County, Alabama, Calhoun County, Alabama, Chambers County, Alabama, Cherokee County, Alabama, Chilton County,
…

用法:

System.out.println("reduce()");
String listOfCountyStateDs = countyStateDs
    .reduce(
        new CountyStateConcatenatorUsingReduce());
System.out.println(listOfCountyStateDs);

实施:

 private final class CountyStateConcatenatorUsingReduce
      implements ReduceFunction<String> {
    private static final long serialVersionUID = 12859L;

    @Override
    public String call(String v1, String v2) throws Exception {
      return v1 + ", " + v2;
    }
  }

但是,您必须编写自己的逻辑,这可能很耗时,而且您更喜欢使用 groupBy...

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-09-09
    • 2021-09-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-01-27
    • 2018-10-05
    相关资源
    最近更新 更多