【发布时间】:2016-06-17 21:53:04
【问题描述】:
例如,我们有一个 parquet 文件,其中包含 2000 个股票代码在过去 3 年的收盘价,我们想要计算每个代码的 5 天移动平均线。
所以我创建了一个 spark SQLContext 然后
val marketData = sqlcontext.sql("select DATE, SYMBOL, PRICE from stockdata order by DATE").cache()
要获取符号列表,
val symbols = marketData.select("SYMBOL").distinct().collect()
这是for循环:
for (symbol <- symbols) {
marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save()
}
显然,在 spark 上执行 for 循环很慢,每个小结果的 save() 也会减慢进程(我尝试在 for 循环之外定义一个 var result 并合并所有输出以进行 IO 操作在一起,但我得到了一个stackoverflow异常),那么我怎样才能并行化for循环并优化IO操作?
【问题讨论】:
标签: scala apache-spark apache-spark-sql spark-dataframe