【问题标题】:Calling Java/Scala function from a task从任务中调用 Java/Scala 函数
【发布时间】:2015-10-19 12:18:22
【问题描述】:

背景

我最初的问题是为什么在 map 函数中使用 DecisionTreeModel.predict 会引发异常? 并且与 How to generate tuples of (original lable, predicted label) on Spark with MLlib? 有关

当我们使用 Scala API a recommended way 获得对 RDD[LabeledPoint] 的预测时,使用 DecisionTreeModel 是简单地映射到 RDD

val labelAndPreds = testData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}

不幸的是,PySpark 中的类似方法效果不佳:

labelsAndPredictions = testData.map(
    lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()

异常:您似乎正试图从广播变量、操作或转换中引用 SparkContext。 SparkContext 只能在驱动程序上使用,不能在它在工作人员上运行的代码中使用。如需更多信息,请参阅SPARK-5063

而不是 official documentation 推荐这样的东西:

predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)

那么这里发生了什么?这里没有广播变量,Scala API 定义predict 如下:

/**
 * Predict values for a single data point using the model trained.
 *
 * @param features array representing a single data point
 * @return Double prediction from the trained model
 */
def predict(features: Vector): Double = {
  topNode.predict(features)
}

/**
 * Predict values for the given data set using the model trained.
 *
 * @param features RDD representing data points to be predicted
 * @return RDD of predictions for each of the given data points
 */
def predict(features: RDD[Vector]): RDD[Double] = {
  features.map(x => predict(x))
}

所以至少乍一看,从动作或转换调用不是问题,因为预测似乎是一种本地操作。

说明

经过一番挖掘,我发现问题的根源是从DecisionTreeModel.predict 调用的JavaModelWrapper.call 方法。调用Java函数需要accessSparkContext

callJavaFunc(self._sc, getattr(self._java_model, name), *a)

问题

DecisionTreeModel.predict 的情况下,有一个推荐的解决方法,并且所有必需的代码都已经是 Scala API 的一部分,但是一般来说有什么优雅的方法来处理这样的问题吗?

目前只有我能想到的比较重量级的解决方案:

  • 通过隐式转换扩展 Spark 类或添加某种包装器,将所有内容推送到 JVM
  • 直接使用 Py4j 网关

【问题讨论】:

  • 这部分是正确的。我在将 Scala 中的相同代码实现放到 Python 中以用于决策树时遇到了同样的麻烦,并引发了相同的广播问题,因此不得不使用 .zip 函数将标签组合回来。谢谢你的解释!

标签: python scala apache-spark pyspark apache-spark-mllib


【解决方案1】:

使用默认 Py4J 网关进行通信是不可能的。要了解为什么我们必须查看 PySpark 内部文档 [1] 中的下图:

由于 Py4J 网关在驱动程序上运行,因此通过套接字与 JVM 工作人员通信的 Python 解释器无法访问它(参见例如 PythonRDD / rdd.py)。

理论上可以为每个工作人员创建一个单独的 Py4J 网关,但实际上它不太可能有用。忽略可靠性等问题 Py4J 根本不是为执行数据密集型任务而设计的。

有什么解决方法吗?

  1. 使用Spark SQL Data Sources API 包装JVM 代码。

    优点:受支持,高级别的,不需要访问内部 PySpark API

    缺点:相对冗长且没有很好的文档记录,主要限于输入数据

  2. 使用 Scala UDF 对 DataFrame 进行操作。

    优点:易于实现(请参阅Spark: How to map Python with Scala or Java User Defined Functions?),如果数据已经存储在 DataFrame 中,则无需在 Python 和 Scala 之间进行数据转换,对 Py4J 的访问最少

    缺点:需要访问 Py4J 网关和内部方法,仅限于 Spark SQL,难以调试,不支持

  3. 以类似于在 MLlib 中完成的方式创建高级 Scala 接口。

    优点:灵活,能够执行任意复杂代码。它可以直接在 RDD 上使用(例如参见 MLlib model wrappers)或使用 DataFrames(参见 How to use a Scala class inside Pyspark)。后一种解决方案似乎更友好,因为所有 ser-de 细节都已由现有 API 处理。

    缺点:低级,需要数据转换,和UDF一样需要访问Py4J和内部API,不支持

    一些基本的例子可以在Transforming PySpark RDD with Scala找到

  4. 使用外部工作流管理工具在 Python 和 Scala / Java 作业之间切换并将数据传递到 DFS。

    优点:易于实施,对代码本身的更改最少

    缺点:读取/写入数据的成本 (Alluxio?)

  5. 使用共享的SQLContext(参见例如Apache ZeppelinLivy)使用已注册的临时表在来宾语言之间传递数据。

    优点:非常适合交互式分析

    缺点:对于批处理作业(Zeppelin)来说不是很多,或者可能需要额外的编排(Livy)


  1. 约书亚·罗森。 (2014 年 8 月 4 日)PySpark Internals。取自https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-08-13
    • 2011-07-14
    • 2021-02-16
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多