【发布时间】:2018-02-13 18:01:55
【问题描述】:
让我们假设我们必须在过滤后重新分区数据集或获得度并行度。
我们如何执行动态重新分区而不是手动调整分区数量?
注意 - 寻找 RDD、数据框和数据集的解决方案。
【问题讨论】:
标签: apache-spark apache-spark-sql
让我们假设我们必须在过滤后重新分区数据集或获得度并行度。
我们如何执行动态重新分区而不是手动调整分区数量?
注意 - 寻找 RDD、数据框和数据集的解决方案。
【问题讨论】:
标签: apache-spark apache-spark-sql
您可以使用 repartition(colname) 或 partitionBy() 对数据集进行动态分区。
例如,如果您的数据集如下所示
create table sensor_data (
sensor_id bigint,
temp float,
region_id string,
state string,
country string
) partition by ( day string)
如果您想针对特定日期进行区域计算,
val sensor_data = spark.sql("select * from sensor_data where day='2018-02-10')
val results = sensor_data.
repartition(col("region_id")).
mapPartitions( eventIter => {
processEvent(eventIter).iterator
})
case Event(sensor_id: String, country: String, max_temp: float)
def processEvent(evtIter: Iterator[Row]) : List[Event] = {
val maxTempEvents = ListBuffer[Event]()
while (evtIter.hasNext) {
val evt = evtIter.next()
// do your calculation and add results to maxTempEvents list
}
maxTempEvents
}
希望这会有所帮助。
谢谢 拉维
【讨论】: