【发布时间】:2018-03-20 20:33:00
【问题描述】:
我在 Hive 中存储了名为 ExampleData 的下表:
+--------+-----+---|
|Site_ID |Time |Age|
+--------+-----+---|
|1 |10:00| 20|
|1 |11:00| 21|
|2 |10:00| 24|
|2 |11:00| 24|
|2 |12:00| 20|
|3 |11:00| 24|
+--------+-----+---+
我需要能够按站点处理数据。不幸的是,按站点对其进行分区不起作用(有超过 10 万个站点,所有站点都具有相当少量的数据)。
对于每个站点,我需要分别选择 Time 列和 Age 列,并使用它来输入一个函数(理想情况下,我希望在执行程序上运行,而不是在驱动程序上运行)
我有一个关于我希望它如何工作的存根,但是这个解决方案只能在驱动程序上运行,所以它非常慢。我需要找到一种编写它的方法,以便它运行一个执行器级别:
// fetch a list of distinct sites and return them to the driver
//(if you don't, you won't be able to loop around them as they're not on the executors)
val distinctSites = spark.sql("SELECT site_id FROM ExampleData GROUP BY site_id LIMIT 10")
.collect
val allSiteData = spark.sql("SELECT site_id, time, age FROM ExampleData")
distinctSites.foreach(row => {
allSiteData.filter("site_id = " + row.get(0))
val times = allSiteData.select("time").collect()
val ages = allSiteData.select("ages").collect()
processTimesAndAges(times, ages)
})
def processTimesAndAges(times: Array[Row], ages: Array[Row]) {
// do some processing
}
我尝试在所有节点上广播 distinctSites,但这并没有成功。
这似乎是一个简单的概念,但我花了几天时间研究这个。我对 Scala/Spark 很陌生,如果这是一个荒谬的问题,我深表歉意!
非常感谢任何建议或提示。
【问题讨论】:
标签: scala apache-spark bigdata apache-spark-sql spark-dataframe