【问题标题】:Java spark groupByKey with key1 and do aggregateByKey with key2 on groupedRDDJava spark groupByKey 和 key1 并在 groupedRDD 上使用 key2 做聚合
【发布时间】:2017-04-21 03:18:06
【问题描述】:

我正在尝试做一个简单的 java spark 应用程序,它执行以下操作

输入数据csv格式:key1,key2,data1,data2

基本上我在这里要做的是,

首先我通过 key1 映射每一行,然后对该 rdd 执行 groupByKey 操作。

JavaRDD<String> viewRdd = sc.textFile("testfile.csv", 1);
JavaPairRDD<String, String> customerIdToRecordRDD = viewRdd
    .mapToPair(w -> new Tuple2<String, String>(w.split(",")[0], w));
JavaPairRDD<String, Iterable<String>> groupedByKey1RDD = customerIdToRecordRDD.groupByKey();
System.out.println(customerIdToRecordGropedRDD.count());

现在我的问题是,我需要在 groupedByKey1RDD 的每个组上使用 key2 进行聚合。有什么方法可以将 Iterable 转换为 RDD 吗?还是我在这里遗漏了什么。我是新手,任何帮助都会

示例输入和预期输出:

id_1,time0,10,10

id_2,time1,0,10

id_1,time1,11,10

id_1,time0,1,10

id_2,time1,10,10

输出按第1列分组,然后按第2列聚合(聚合逻辑是简单地将column3和column相加):

id_1 : time0 : { sum1 : 11, sum2 : 20} ,
       time1 : { sum1 : 11, sum2 : 10}

id_2 : time1 : { sum1 : 10, sum2 : 20} 

【问题讨论】:

  • 能否提供 csv 数据样本和预期输出?
  • @abaghel 添加了示例输入和输出
  • 你使用的spark版本是什么?你想使用RDD吗,因为这很容易解决,并且使用Dataframe更易于管理。

标签: java apache-spark


【解决方案1】:

这是使用 Spark 2.0 和 Dataframe 的解决方案。如果您仍想使用 RDD,请告诉我。

public class SparkGroupBySample {
    public static void main(String[] args) {
    //SparkSession
    SparkSession spark = SparkSession
            .builder()
            .appName("SparkGroupBySample")
            .master("local")
            .getOrCreate();     
    //Schema
    StructType schema = new StructType(new StructField[] { 
            new StructField("key1", DataTypes.StringType, true, Metadata.empty()),
            new StructField("key2", DataTypes.StringType, true, Metadata.empty()),
            new StructField("data1", DataTypes.IntegerType, true, Metadata.empty()),
            new StructField("data2", DataTypes.IntegerType, true, Metadata.empty())});
    //Read csv
    Dataset<Row> dataSet = spark.read().format("csv").schema(schema).option("header", "true").option("delimiter", ",").load("c:\\temp\\sample.csv");
    dataSet.show();     
    //groupBy and aggregate
    Dataset<Row> dataSet1 = dataSet.groupBy("key1","key2").sum("data1","data2").toDF("key1","key2","sum1","sum2");
    dataSet1.show();
    //stop
    spark.stop();
   }
}

这是输出。

+----+-----+----+----+
|key1| key2|sum1|sum2|
+----+-----+----+----+
|id_1|time1|  11|  10|
|id_2|time1|  10|  20|
|id_1|time0|  11|  20|
+----+-----+----+----+

【讨论】:

  • 非常感谢@abaghel。我之前没有使用过Dataframes,所以对它知之甚少。我正在尝试做更复杂的用户定义的聚合方法,而不仅仅是 sum,我相信我们也可以将其与数据帧一起使用。但如果可能的话,我想知道我们如何使用 RDD 实现相同的目标。我正在使用 spark 2.0
  • 使用 Spark 2.0,您应该使用 Dataframes。是的,您可以通过将内置函数传递给 agg 方法来进行不同类型的聚合,或者您可以调用用户定义的函数。请检查“org.apache.spark.sql.functions”。示例 - dataSet.groupBy("c1").agg(org.apache.spark.sql.functions.collect_list("c2"));
  • 我试图用一些额外的派生值聚合并创建一个新的 pojo。这可能使用数据框吗?谢谢!
  • 基本上我正在寻找类似于此的输出 { map of key1 : { map of key2 : [ list of aggregated pojos ]} } .
  • 如果您有来自输入数据的 Dataframe,您可以调用 map 或 flatmap 函数来创建所需的输出。您甚至可以通过在输入数据上创建临时表来运行 SQL 查询。详情请查看spark.apache.org/docs/latest/sql-programming-guide.html
猜你喜欢
  • 2017-04-25
  • 1970-01-01
  • 2015-05-24
  • 2018-10-31
  • 2017-05-01
  • 2023-04-05
  • 2017-11-02
  • 2013-10-21
  • 2010-12-21
相关资源
最近更新 更多