【问题标题】:Scala Fork-Join-All With Multiple Generic Types and 1 Generic Unit of Work具有多种通用类型和 1 个通用工作单元的 Scala Fork-Join-All
【发布时间】:2016-02-05 18:47:34
【问题描述】:

我正在尝试编写一个方法,该方法接受多个泛型类型并将一个工作单元作为参数来执行。

这个想法是工作单元是一个通用的功能,它本身就是通用的。举例来说,假设它类似于以下内容:

def loadModelRdd[T: TypeTag](sc: SparkContext): RDD[T] = {
  ...
}

loadModelRdd() 将在一些内部处理(例如加载模型信息等)之后构造给定类型的 RDD。

我一直在破解的原型方法如下所示(无效):

def forkAll[A : Manifest, B : Manifest](work: => RDD[_]): (RDD[A], RDD[B]) = {
  def aFuture = Future { work } // How can I notify that this work call returns type A?
  def bFuture = Future { work } // How can I notify that this work call returns type B?

  val res = for {
    a <- aFuture
    b <- bFuture
  } yield (a.asInstanceOf[A], b.asInstanceOf[B])

  Await.result(res, 10.seconds)
}

这是我正在处理的代码的缩短版本,因为我实际上正在考虑接受多达 10 种不同的类型。

如您所见,forkAll 方法的总体目标是将工作单元包装在 Future 中,fork-join 执行每种类型的工作单元,然后将结果作为 Tuple 的结果返回.一个示例消费者声明是:

val (a, b) = forkAll[ClassA, ClassB](loadModelRdd)

也就是说,我现在想要 fork-join 并等待结果,但我希望执行并行执行,然后收集回驱动程序(具体来说是 Spark 驱动程序)。

问题是我不确定在构造 Future {} 块时如何强制 forkAll 中的工作单元返回的类型。如果没有 forkAll,实现如下所示:

val resA = loadModelRdd[ClassA](sc)
val resB = loadModelRdd[ClassB](sc)
...

我考虑这样做有两个原因:

  1. 为与此模型匹配的任何工作单元抽象出 fork-join 的详细信息。
  2. 此代码的一个版本,它明确说明工作单元是什么,正在生产中工作,负责将长时间运行的块的执行减少近一半。我有几个可以应用此模式的执行步骤

这在 Scala 的类型系统中是可能的吗?还是我应该从不同的角度看待这个问题?我已经尝试了几种实现方式(包括一种描述为here),但我还没有找到一种适合我当前对问题的看法的方法

如果需要任何其他信息,请告诉我。

谢谢!

【问题讨论】:

    标签: scala generics reflection apache-spark scala-reflect


    【解决方案1】:

    简答:Scala does not allow functions with type parameters,所以你想要的并不完全可能。

    您正在尝试传递带有类型参数的方法。虽然方法可以有类型参数,但函数不能。当你尝试传递一个方法时,它就像一个匿名函数,所以你必须指定一个类型。

    但是,由于方法确实允许类型参数,因此您可以通过创建一个抽象类来执行您的 fork/join 来利用这一点

    abstract class ForkJoin {
    
      protected def work[T]: RDD[T]
    
      def apply[A, B]: (RDD[A], RDD[B]) = {
        // Write implementation of fork/join here
        (work[A], work[B])
      }
    }
    

    然后覆盖类型泛型work 方法,以便它执行您想要的操作,例如调用其他一些预定义的方法。

    val forkJoin = new ForkJoin {
      override protected def work[T]: RDD[T] =
        loadModelRdd[T](sc)
    }
    
    val (intRdd, stringRdd) = forkJoin[Int, String]
    

    请查看 this 以获取编译和运行没有问题的原型实现。

    【讨论】:

      猜你喜欢
      • 2019-05-15
      • 2019-02-02
      • 1970-01-01
      • 1970-01-01
      • 2015-01-19
      • 1970-01-01
      • 2017-09-15
      • 2023-03-30
      • 1970-01-01
      相关资源
      最近更新 更多