【问题标题】:Spark2 Dataframe/RDD process in groupsSpark2 Dataframe/RDD 进程分组
【发布时间】:2018-03-20 20:33:00
【问题描述】:

我在 Hive 中存储了名为 ExampleData 的下表:

+--------+-----+---|
|Site_ID |Time |Age|
+--------+-----+---|
|1       |10:00| 20|
|1       |11:00| 21|
|2       |10:00| 24|
|2       |11:00| 24|
|2       |12:00| 20|
|3       |11:00| 24|
+--------+-----+---+

我需要能够按站点处理数据。不幸的是,按站点对其进行分区不起作用(有超过 10 万个站点,所有站点都具有相当少量的数据)。

对于每个站点,我需要分别选择 Time 列和 Age 列,并使用它来输入一个函数(理想情况下,我希望在执行程序上运行,而不是在驱动程序上运行)

我有一个关于我希望它如何工作的存根,但是这个解决方案只能在驱动程序上运行,所以它非常慢。我需要找到一种编写它的方法,以便它运行一个执行器级别:

// fetch a list of distinct sites and return them to the driver 
//(if you don't, you won't be able to loop around them as they're not on the executors)
val distinctSites = spark.sql("SELECT site_id FROM ExampleData GROUP BY site_id LIMIT 10")
.collect

val allSiteData = spark.sql("SELECT site_id, time, age FROM ExampleData")

distinctSites.foreach(row => {
    allSiteData.filter("site_id = " + row.get(0))
    val times = allSiteData.select("time").collect()
    val ages = allSiteData.select("ages").collect()
    processTimesAndAges(times, ages)
})

def processTimesAndAges(times: Array[Row], ages: Array[Row]) {
    // do some processing
}

我尝试在所有节点上广播 distinctSites,但这并没有成功。

这似乎是一个简单的概念,但我花了几天时间研究这个。我对 Scala/Spark 很陌生,如果这是一个荒谬的问题,我深表歉意!

非常感谢任何建议或提示。

【问题讨论】:

    标签: scala apache-spark bigdata apache-spark-sql spark-dataframe


    【解决方案1】:

    RDD API 提供了许多函数,可用于分组执行操作,从低级 repartition / repartitionAndSortWithinPartitions 开始,到许多 *byKey 方法(combineByKey、groupByKey、reduceByKey 等)结束。

    例子:

    rdd.map( tup => ((tup._1, tup._2, tup._3), tup) ).
      groupByKey().
      forEachPartition( iter => doSomeJob(iter) )
    

    在DataFrame中可以使用聚合函数,GroupedData类为最常用的函数提供了一些方法,包括count、max、min、mean和sum

    例子:

       val df = sc.parallelize(Seq(
          (1, 10.3, 10), (1, 11.5, 10),
          (2, 12.6, 20), (3, 2.6, 30))
        ).toDF("Site_ID ", "Time ", "Age")
    
    df.show()
    
    +--------+-----+---+
    |Site_ID |Time |Age|
    +--------+-----+---+
    |       1| 10.3| 10|
    |       1| 11.5| 10|
    |       2| 12.6| 20|
    |       3|  2.6| 30|
    +--------+-----+---+
    
    
        df.groupBy($"Site_ID ").count.show
    
    +--------+-----+
    |Site_ID |count|
    +--------+-----+
    |       1|    2|
    |       3|    1|
    |       2|    1|
    +--------+-----+
    

    注意:正如您提到的解决方案非常慢,您需要使用分区,在您的情况下范围分区是一个不错的选择。

    【讨论】:

    • 谢谢!正是 groupByKey() 把我带到了我需要的地方。非常感谢,也感谢您的快速回复。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-08-04
    • 1970-01-01
    • 2021-10-06
    • 2017-11-02
    • 2018-11-08
    • 2017-02-03
    • 2017-04-11
    相关资源
    最近更新 更多