【问题标题】:Spark mapPartitions vs transient lazy valSpark mapPartitions vs 瞬态惰性值
【发布时间】:2017-04-08 00:22:21
【问题描述】:

我想知道使用 sparks mapPartitions 功能与瞬态惰性验证有什么不同。
由于每个分区基本上都在不同的节点上运行,因此将在每个节点上创建一个瞬态惰性 val 实例(假设它在一个对象中)。

例如:

class NotSerializable(v: Int) {
  def foo(a: Int) = ???
}

object OnePerPartition {
  @transient lazy val obj: NotSerializable = new NotSerializable(10)
}

object Test extends App{
    val conf = new SparkConf().setMaster("local[2]").setAppName("test")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(1 to 100000)

    rdd.map(OnePerPartition.obj.foo)

    // ---------- VS ----------

    rdd.mapPartitions(itr => {
      val obj = new NotSerializable(10)
      itr.map(obj.foo)
    })
}

有人可能会问你为什么想要它...
我想创建一个通用容器概念,用于在任何通用集合实现(RDDListscalding pipe 等)上运行我的逻辑
他们都有“地图”的概念,但mapPartitionspark 来说是独一无二的。

【问题讨论】:

    标签: dictionary apache-spark partition transient


    【解决方案1】:

    首先,这里不需要transient lazy。使用 object 包装器足以完成这项工作,您实际上可以这样写:

    object OnePerExecutor {
      val obj: NotSerializable = new NotSerializable(10)
    }
    

    对象包装器和在mapPartitions 中初始化NotSerializable 之间存在根本区别。这个:

    rdd.mapPartitions(iter => {
      val ns = NotSerializable(1)
      ???
    })
    

    为每个分区创建一个 NotSerializable 实例。

    另一方面,对象包装器为每个执行程序 JVM 创建一个 NotSerializable 实例。结果这个实例:

    • 可用于处理多个分区。
    • 可以被多个执行线程同时访问。
    • 使用寿命超过使用它的函数调用。

    这意味着它应该是线程安全的,并且任何方法调用都应该没有副作用。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-04-27
      • 2017-11-06
      • 2019-09-01
      • 1970-01-01
      • 2016-08-25
      • 1970-01-01
      相关资源
      最近更新 更多