【问题标题】:Scala distributed execution of function objectsScala分布式执行函数对象
【发布时间】:2014-11-05 17:06:28
【问题描述】:

给定以下函数对象,

val f : Int => Double = (i:Int) => i + 0.1

val g1 : Double => Double = (x:Double) => x*10

val g2 : Double => Double = (x:Double) => x/10

val h : (Double,Double) => Double = (x:Double,y:Double) => x+y

例如3个远程服务器或节点(IP xxx.xxx.xxx.1、IP 2和IP 3),如何分配这个程序的执行,

val fx = f(1)
val g1x = g1( fx )
val g2x = g2( fx )
val res = h ( g1x, g2x )

这样

  • fx 在 IP 1 中计算,
  • g1x 在 IP 2 中计算,
  • g2x 在 IP 3 中计算,
  • res 在 IP 1 中计算

Scala Akka 或 Apache Spark 可以提供一个简单的方法来解决这个问题吗?

更新

  • @pkinsky 建议的 RPC(远程过程调用)Finagle 可能是一个可行的选择。
  • 将负载平衡策略视为一种选择执行节点的机制,至少任何可用的免费节点策略

【问题讨论】:

  • 问题,你为什么使用函数 vals 而不是 defs?
  • @ElectricCoffee 没有什么特别的原因,这个问题可以很好地说明方法,如果它导致一个好的解决方案真的是一个选择:)
  • 你想序列化你的函数,将它们发送到远程服务器,让远程服务器执行它们,序列化结果,然后返回给你吗?还是您只需要一个 RPC 库?如果是第二个,请查看 twitter 的开源 Finagle 库。
  • @pkinsky 非常感谢您的想法,这很新颖,不确定每个选项的质量...
  • @enzyme 你几乎肯定想要第二个选项。查看 twitter 的主题介绍和分步分布式搜索引擎项目。 twitter.github.io/scala_school/finagle.htmltwitter.github.io/scala_school/searchbird.html

标签: scala akka rpc apache-spark finagle


【解决方案1】:

我可以代表 Apache Spark。它可以用下面的代码做你正在寻找的东西。但它不是为这种并行计算而设计的。它专为并行计算而设计,您还可以在许多机器上分布大量并行数据。所以这个解决方案看起来有点傻,例如我们在一台机器上分配一个整数(对于f(1))。

此外,Spark 旨在对所有数据运行相同的计算。所以并行运行g1()g2() 有点违背设计。 (如您所见,这是可能的,但并不优雅。)

// Distribute the input (1) across 1 machine.
val rdd1 = sc.parallelize(Seq(1), numSlices = 1)
// Run f() on the input, collect the results and take the first (and only) result.
val fx = rdd1.map(f(_)).collect.head
// The next stage's input will be (1, fx), (2, fx) distributed across 2 machines.
val rdd2 = sc.parallelize(Seq((1, fx), (2, fx)), numSlices = 2)
// Run g1() on one machine, g2() on the other.
val gxs = rdd2.map {
  case (1, x) => g1(x)
  case (2, x) => g2(x)
}.collect
val g1x = gxs(0)
val g2x = gxs(1)
// Same deal for h() as for f(). The input is (g1x, g2x), distributed to 1 machine.
val rdd3 = sc.parallelize(Seq((g1x, g2x)), numSlices = 1)
val res = rdd3.map { case (g1x, g2x) => h(g1x, g2x) }.collect.head

您可以看到 Spark 代码是基于 RDDs 的概念。一个 RDD 就像一个数组,除了它是在多台机器上分区的。 sc.parallelize() 从本地集合创建这样的并行集合。例如,上面代码中的rdd2 将从本地集合Seq((1, fx), (2, fx)) 创建并拆分到两台机器上。一台机器有Seq((1, fx)),另一台机器有Seq((2, fx))

接下来我们对 RDD 进行转换map 是一种常见的转换,它通过对每个元素应用一个函数来创建一个相同长度的新 RDD。 (与 Scala 的map 相同。)我们在rdd2 上运行的map 将用g1(x) 替换(1, x),用g2(x) 替换(2, x)。所以在一台机器上它会导致g1() 运行,而在另一台机器上g2() 会运行。

只有当您想要访问结果时,转换才会延迟运行。访问结果的方法称为actions。最直接的例子是collect,它将整个RDD的内容从集群下载到本地机器。 (与sc.parallelize()正好相反。)

如果你下载 Spark,启动 bin/spark-shell,然后将你的函数定义和上面的代码复制到 shell 中,你可以试试看。

【讨论】:

  • 非常感谢,您能否详细介绍一下Seq((1, fx), (2, fx) 和后续的map
  • 我添加了几段来解释。如果您仍有疑问,请告诉我!
猜你喜欢
  • 2016-07-21
  • 1970-01-01
  • 1970-01-01
  • 2013-01-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-04-19
相关资源
最近更新 更多