【问题标题】:Error when running random_forest_example.py in spark with python使用 python 在 spark 中运行 random_forest_example.py 时出错
【发布时间】:2015-10-05 16:04:05
【问题描述】:

我是 Spark 的新手,所以我浏览了 Spark 示例文件夹中提供的一些示例。当我尝试 random_forest_example.py 时,出现以下错误:

py4j.protocol.Py4JJavaError: 调用时出错 z:org.apache.spark.api.python.PythonRDD.runJob。 : org.apache.spark.SparkException:作业因阶段失败而中止: 阶段 2.0 中的任务 0 失败 1 次,最近一次失败:丢失任务 0.0 在阶段 2.0 (TID 3, localhost): java.net.SocketException: Connection 由对等方重置:套接字写入错误 java.net.SocketOutputStream.socketWrite0(本机方法)在 java.net.SocketOutputStream.socketWrite(Unknown Source) at java.net.SocketOutputStream.write(Unknown Source) at java.io.BufferedOutputStream.flushBuffer(未知来源)在 java.io.BufferedOutputStream.flush(未知来源)在 java.io.DataOutputStream.flush(未知来源)在 org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:251) 在 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1772) 在 org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208)

我运行的代码是:

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer
from pyspark.ml.regression import RandomForestRegressor
from pyspark.mllib.evaluation import MulticlassMetrics, RegressionMetrics
from pyspark.mllib.util import MLUtils
from pyspark.sql import Row, SQLContext

"""
A simple example demonstrating a RandomForest Classification/Regression Pipeline.
Run with:
  bin/spark-submit examples/src/main/python/ml/random_forest_example.py
"""


def testClassification(train, test):
    # Train a RandomForest model.
    # Setting featureSubsetStrategy="auto" lets the algorithm choose.
    # Note: Use larger numTrees in practice.

    rf = RandomForestClassifier(labelCol="indexedLabel", numTrees=3, maxDepth=4)

    model = rf.fit(train)
    predictionAndLabels = model.transform(test).select("prediction", "indexedLabel") \
        .map(lambda x: (x.prediction, x.indexedLabel))

    metrics = MulticlassMetrics(predictionAndLabels)
    print("weighted f-measure %.3f" % metrics.weightedFMeasure())
    print("precision %s" % metrics.precision())
    print("recall %s" % metrics.recall())


def testRegression(train, test):
    # Train a RandomForest model.
    # Note: Use larger numTrees in practice.

    rf = RandomForestRegressor(labelCol="indexedLabel", numTrees=3, maxDepth=4)

    model = rf.fit(train)
    predictionAndLabels = model.transform(test).select("prediction", "indexedLabel") \
        .map(lambda x: (x.prediction, x.indexedLabel))

    metrics = RegressionMetrics(predictionAndLabels)
    print("rmse %.3f" % metrics.rootMeanSquaredError)
    print("r2 %.3f" % metrics.r2)
    print("mae %.3f" % metrics.meanAbsoluteError)


if __name__ == "__main__":
    if len(sys.argv) > 1:
        print("Usage: random_forest_example", file=sys.stderr)
        exit(1)
    sc = SparkContext(appName="PythonRandomForestExample")
    sqlContext = SQLContext(sc)

    # Load and parse the data file into a dataframe.
    df = MLUtils.loadLibSVMFile(sc, "D:\spark-1.4.0\examples\src\main\python\ml\sample_libsvm_data.txt").toDF()

    # Map labels into an indexed column of labels in [0, numLabels)
    stringIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
    si_model = stringIndexer.fit(df)
    td = si_model.transform(df)
    [train, test] = td.randomSplit([0.7, 0.3])
    testClassification(train, test)
    testRegression(train, test)
    sc.stop()

我逐行检查,发现错误是在这一行产生的

 df = MLUtils.loadLibSVMFile(sc, "D:\spark1.4.0\examples\src\main\python\ml\sample_libsvm_data.txt").toDF()

.toDF() 方法似乎有问题,但我不知道是什么原因造成的。谁能帮我解决这个问题。

【问题讨论】:

  • 您确定这是一条正确的路径吗?示例数据位于data 而不是examples
  • @zero323 感谢您的回复。是,我确定。原始路径对我不起作用,因此我将数据文件移动到我指定的位置。

标签: python apache-spark


【解决方案1】:

您是否在后台运行 Spark 实例?如果没有,您应该在脚本中本地部署 Spark(配置中的“setMaster("local")”部分),这是来自 Spark 官方文档:

from pyspark import SparkConf, SparkContext
conf = (SparkConf()
         .setMaster("local")
         .setAppName("My app")
         .set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)code here

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-10-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多