【发布时间】:2015-06-29 11:45:27
【问题描述】:
我有下一个代码:
sc.parquetFile("some large parquet file with bc").registerTempTable("bcs")
sc.parquetFile("some large parquet file with imps").registerTempTable("imps")
val bcs = sc.sql("select * from bcs")
val imps = sc.sql("select * from imps")
我想做:
bcs.map(x => wrapBC(x)).collect
imps.map(x => wrapIMP(x)).collect
但是当我这样做时,它不是异步运行的。我可以用 Future 来做,就像这样:
val bcsFuture = Future { bcs.map(x => wrapBC(x)).collect }
val impsFuture = Future { imps.map(x => wrapIMP(x)).collect }
val result = for {
bcs <- bcsFuture
imps <- impsFuture
} yield (bcs, imps)
Await.result(result, Duration.Inf) //this return (Array[Bc], Array[Imp])
我想在没有 Future 的情况下这样做,我该怎么做?
【问题讨论】:
-
您能否进一步解释一下您使用 wrapBC() 或 wrapIMP() 所做的事情以及您使用 Future 实现的目标?
-
@hnahak 与 wrapBC 和 wrapIMP 我将 sql.Row 表示为 BC 和 IMP 对象。当我使用 Future 时,bcs.map(x => wrapBC(x)).collect 和 imps.map(x => wrapIMP(x)).collect 在 spark 上异步运行,否则按顺序运行。
-
您的目标是加快计算速度,还是其他?
map已经可以并行化了 - 为什么要并行运行两个map调用? -
@stholzm 是的。如果我使用 Future,那么这段代码的运行速度会快 20%。
-
@stholzm 我有两个大型镶木地板文件:bc 和 imp。
标签: scala apache-spark apache-spark-sql