【问题标题】:How can I parallelize a for loop in spark with scala?如何使用scala并行化spark中的for循环?
【发布时间】:2016-06-17 21:53:04
【问题描述】:

例如,我们有一个 parquet 文件,其中包含 2000 个股票代码在过去 3 年的收盘价,我们想要计算每个代码的 5 天移动平均线。

所以我创建了一个 spark SQLContext 然后

val marketData = sqlcontext.sql("select DATE, SYMBOL, PRICE from stockdata order by DATE").cache()

要获取符号列表,

val symbols = marketData.select("SYMBOL").distinct().collect()

这是for循环:

for (symbol <- symbols) {
  marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save()
}

显然,在 spark 上执行 for 循环很慢,每个小结果的 save() 也会减慢进程(我尝试在 for 循环之外定义一个 var result 并合并所有输出以进行 IO 操作在一起,但我得到了一个stackoverflow异常),那么我怎样才能并行化for循环并优化IO操作?

【问题讨论】:

    标签: scala apache-spark apache-spark-sql spark-dataframe


    【解决方案1】:

    您编写的程序在驱动程序(“主”)火花节点中运行。只有在并行结构 (RDD) 上进行操作时,才能并行化此程序中的表达式。

    试试这个:

    marketdata.rdd.map(symbolize).reduceByKey{ case (symbol, days) => days.sliding(5).map(makeAvg)  }.foreach{ case (symbol,averages) => averages.save() }
    

    其中symbolize 接受符号 x 天的行并返回一个元组(符号,天)。

    【讨论】:

    • 感谢您的回答。但是,marketdata 包含所有市场数据(2000 品种 × 900 天 = 1800000 行),如果我们在这个 rdd 上 sliding(5) 没有 filter(symbol) 似乎会得到关于移动平均线的错误结果?我说清楚了吗?
    • 感谢您的耐心等待。据我所知,如果我们在map(symbolize) 返回的rdd 上有symbolize 之类的{ row =&gt; (row.getAs[String]("SYMBOL"), row) }reduceByKey,我们将不得不reduceByKey{ case (row_x, row_y) =&gt; ...} 而不是reduceByKey{ case (symbol, days) =&gt; ...},最后,我groupByKey() on map(symbolize) 返回的 rdd 和 mapValues(x =&gt; x.sliding(5).map(makeAvg)).save() 并且有效。再次感谢您的帮助!
    【解决方案2】:

    对于答案的第一部分,我不同意 Carlos。该程序不在驱动程序(“主”)中运行。

    循环确实是按顺序运行的,但是对于每个符号的执行:

    marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save()
    

    是并行完成的,因为markedData 是一个 Spark DataFrame 并且它是分布式的。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-08-11
      • 1970-01-01
      • 2020-07-20
      • 1970-01-01
      相关资源
      最近更新 更多