【问题标题】:How to parallelize several apache spark rdds?如何并行化几个 apache spark rdds?
【发布时间】:2015-06-29 11:45:27
【问题描述】:

我有下一个代码:

sc.parquetFile("some large parquet file with bc").registerTempTable("bcs")
sc.parquetFile("some large parquet file with imps").registerTempTable("imps")
val bcs = sc.sql("select * from bcs")
val imps = sc.sql("select * from imps")

我想做:

bcs.map(x => wrapBC(x)).collect
imps.map(x => wrapIMP(x)).collect

但是当我这样做时,它不是异步运行的。我可以用 Future 来做,就像这样:

val bcsFuture = Future { bcs.map(x => wrapBC(x)).collect }
val impsFuture = Future { imps.map(x => wrapIMP(x)).collect }
val result = for {
  bcs <- bcsFuture
  imps <- impsFuture
} yield (bcs, imps)
Await.result(result, Duration.Inf) //this return (Array[Bc], Array[Imp])

我想在没有 Future 的情况下这样做,我该怎么做?

【问题讨论】:

  • 您能否进一步解释一下您使用 wrapBC() 或 wrapIMP() 所做的事情以及您使用 Future 实现的目标?
  • @hnahak 与 wrapBC 和 wrapIMP 我将 sql.Row 表示为 BC 和 IMP 对象。当我使用 Future 时,bcs.map(x => wrapBC(x)).collect 和 imps.map(x => wrapIMP(x)).collect 在 spark 上异步运行,否则按顺序运行。
  • 您的目标是加快计算速度,还是其他? map 已经可以并行化了 - 为什么要并行运行两个 map 调用?
  • @stholzm 是的。如果我使用 Future,那么这段代码的运行速度会快 20%。
  • @stholzm 我有两个大型镶木地板文件:bc 和 imp。

标签: scala apache-spark apache-spark-sql


【解决方案1】:

更新这最初是在问题更新之前编写的。鉴于这些更新,我同意 @stholzm's answer 在这种情况下使用 cartesian


确实存在数量有限的操作,它们将为RDD[A] 生成FutureAction[A] 并在后台执行。这些在AsyncRDDActions 类上可用,只要您导入SparkContext._,任何RDD 都可以根据需要隐式转换为AysnchRDDAction。对于您的具体代码示例:

bcs.map(x => wrapBC(x)).collectAsync
imps.map(x => wrapIMP(x)).collectAsync

除了在后台评估 DAG 以采取行动之外,生成的 FutureAction 还具有 cancel 方法来尝试提前结束处理。

警告

这可能不会像您认为的那样。 如果您的目的是从两个来源获取数据,然后将它们组合起来,您更有可能想要加入或分组 RDD。为此,您可以查看 PairRDDFunctions 中可用的函数,通过隐式转换再次在 RDD 上可用。

如果目的不是让数据图交互,那么到目前为止,根据我的经验,并发运行批处理可能只会减慢两者的速度,尽管这可能是集群配置方式的结果。如果资源管理器被设置为以 FIFO 顺序让每个执行阶段垄断集群(我相信独立和 YARN 模式的默认设置;我不确定 Mesos),那么每个异步收集都会与每个其他人为那个垄断,运行他们的任务,然后再次竞争下一个执行阶段。

将此与使用Future 来包装对下游服务或数据库的阻塞调用进行比较,例如,相关资源是完全独立的,或者通常有足够的资源容量来并行处理多个请求而不会发生争用。

【讨论】:

    【解决方案2】:

    更新:我误解了这个问题。想要的结果不是笛卡尔积Array[(Bc, Imp)]

    但我认为单个 map 调用需要多长时间并不重要,因为只要您添加其他转换,Spark 就会尝试以有效的方式组合它们。只要您只在 RDD 上链接转换,数据就不会发生任何事情。当您最终应用 action 时,执行引擎会找到一种方法来生成请求的数据。

    所以我的建议是不要过多考虑中间步骤,尽可能避免使用collect,因为它会将所有数据提取到驱动程序中。


    您似乎正在自己构建笛卡尔积。改用cartesian

    val bc = bcs.map(x => wrapBC(x))
    val imp = imps.map(x => wrapIMP(x))
    val result = bc.cartesian(imp).collect
    

    请注意,collect 在最终 RDD 上调用,而不再在中间结果上调用。

    【讨论】:

    • cartesian 返回 Array[(Bc, Imp)],如果我添加另一个 RDD,像这样:bc.cartesian(imp).cartesian(another).collect。它会返回我 Array[(Bc, Imp), Another)]。但是,笛卡尔在这种情况下工作得非常慢,因为 bc 可能包含数千行,但 imp 可能包含超过百万行。你怎么看?
    • cartesian 实际上会返回 RDD,在调用 collect 之类的操作之前,您可以在其上链接其他转换。在您的情况下,您可能希望在 collect 之前使用 filter。或者看看提到的PairRDDFunctions@hayden.sikh,也许有一个更接近你想要实现的转变。这个想法是建立一个 RDD 转换链并且只收集最终结果。
    • @Leonard:我还不太清楚你的目标是什么。无论如何,两个大型数据集的笛卡尔积显然是一项昂贵的操作。想想你为什么需要它。也许只有然后加入这两个表?有专门的 RDD 转换函数可以更好地为您服务。
    • 我的目标是获取所有 Bc 的数组和所有 Imp 的数组。如果我使用cartesian 大约需要 2.2 分钟,但如果我使用 Future 它大约需要 1.1 分钟。
    • 没关系,如果我们在谈论避免collect。因为如果我使用val bcs = sc.sql("select * from bcs where id = 1")val imps = sc.sql("select * from imps where id = 1") 然后同时使用collect,如果我不使用Future 也会很慢。你怎么看?
    【解决方案3】:

    您可以使用union 来解决这个问题。例如:

    bcs.map(x => wrapBC(x).asInstanceOf[Any])
    imps.map(x => wrapIMP(x).asInstanceOf[Any])
    
    val result = (bcs union imps).collect()
    val bcsResult = result collect { case bc: Bc => bc }
    val impsResult = result collect { case imp: Imp => imp }
    

    如果要使用 sortBy 或其他操作,可以使用 trait 或 main 类的继承。

    【讨论】:

      猜你喜欢
      • 2015-06-04
      • 2017-03-31
      • 2017-09-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-04-08
      • 2015-12-16
      • 2016-09-27
      相关资源
      最近更新 更多