【问题标题】:Pyspark socket timeout exception after application running for a while应用程序运行一段时间后 Pyspark 套接字超时异常
【发布时间】:2016-11-19 22:09:17
【问题描述】:

我正在使用 pyspark 来估计逻辑回归模型的参数。我使用 spark 计算似然度和梯度,然后使用 scipy 的最小化函数进行优化(L-BFGS-B)。

我使用 yarn-client 模式来运行我的应用程序。我的应用程序可以毫无问题地开始运行。但是,过了一会儿它报告了以下错误:

Traceback (most recent call last):
  File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/simulation/20160716-1626/spark_1m_data.py", line 115, in <module>
    res = trainEM2(distData, params0, verbose=True, em_tol=1e-5, opt_method='L-BFGS-B')
  File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 166, in trainEM
    options={'disp': False})
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/_minimize.py", line 447, in minimize
    callback=callback, **options)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 330, in _minimize_lbfgsb
    f, g = func_and_grad(x)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 278, in func_and_grad
    f = fun(x, *args)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/optimize.py", line 289, in function_wrapper
    return function(*(wrapper_args + args))
  File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 146, in fun_observedQj
    return dataAndWeightsj_old.map(lambda _: calObservedQj(_[0], _[1], vparamsj, params0)).sum()
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 995, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 869, in fold
    vals = self.mapPartitions(func).collect()
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 772, in collect
    return list(_load_from_socket(port, self._jrdd_deserializer))
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 142, in _load_from_socket
    for item in serializer.load_stream(rf):
  File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 139, in load_stream
16/07/16 20:59:10 ERROR python.PythonRDD: Error while sending iterator
java.net.SocketTimeoutException: Accept timed out
    at java.net.PlainSocketImpl.socketAccept(Native Method)
    at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
    at java.net.ServerSocket.implAccept(ServerSocket.java:545)
    at java.net.ServerSocket.accept(ServerSocket.java:513)
    at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645)
    yield self._read_with_length(stream)
  File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 156, in _read_with_length
    length = read_int(stream)
  File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 543, in read_int
    length = stream.read(4)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/socket.py", line 384, in read
    data = self._sock.recv(left)
socket.timeout: timed out

当我将 spark 日志级别设置为“ALL”时,我还发现了 python broken pipe 错误。

我使用的是 Spark 1.6.2 和 Java 1.8.0_91。知道发生了什么吗?

--更新--

我发现这与我在程序中使用的优化程序有关。

我所做的是使用 EM 算法(作为迭代算法)使用最大似然法估计统计模型。在每次迭代期间,我需要通过解决最小化问题来更新参数。 Spark 负责计算我的可能性和梯度,然后将其传递给我使用 L-BFGS-B 方法的 Scipy 的最小化例程。似乎这个例程中的某些东西使我的 Spark 工作崩溃了。但我不知道例程的哪一部分导致了这个问题。

另一个观察结果是,在使用相同的样本和相同的程序时,我改变了分区的数量。当分区数量很少时,我的程序可以毫无问题地完成。但是,当分区数量变大时,程序开始崩溃。

【问题讨论】:

    标签: exception optimization apache-spark pyspark


    【解决方案1】:

    我有类似的问题。我有一个迭代,有时执行时间太长以至于超时。增加spark.executor.heartbeatInterval 似乎可以解决问题。我将它增加到 3600 秒,以确保我不会再次遇到超时,并且从那时起一切正常。

    发件人:http://spark.apache.org/docs/latest/configuration.html

    spark.executor.heartbeatInterval 10s 每个执行器对驱动程序的心跳之间的间隔。 Heartbeats 让驱动程序知道执行程序仍然存在,并使用正在进行的任务的指标对其进行更新。

    【讨论】:

    • 奇怪的是默认配置会因为大型任务而崩溃。这不就是 Spark 的用途吗?
    • 执行者应该仍然响应,即使它正在“做事”。
    【解决方案2】:

    我遇到了类似的问题,对我来说,这解决了它:

    import pyspark as ps
    
    conf = ps.SparkConf().setMaster("yarn-client").setAppName("sparK-mer")
    conf.set("spark.executor.heartbeatInterval","3600s")
    sc = ps.SparkContext('local[4]', '', conf=conf) # uses 4 cores on your local machine
    

    在此处设置其他选项的更多示例: https://gist.github.com/robenalt/5b06415f52009c5035910d91f5b919ad

    【讨论】:

    • 我在单节点系统上运行 Spark Streaming 以进行测试时遇到了同样的问题。实际上是 'local[4]' 参数修复了它!根据我的经验,更改 "spark.executor.heartbeatInterval"(以及 spark.network.timeout,因为它必须大于 heartbeatInterval)在这种情况下没有任何效果。
    • 我在本地运行应用程序时遇到了同样的问题。我尝试了local[4]local[*],并尝试增加heartbeatIntervalnetwork.timeout,但没有效果。使用local 为我解决了这个问题。配置:(通过 Anaconda 的 pyspark 3.1.2,处理器:2 核(4 个逻辑单元))
    【解决方案3】:

    查看执行程序日志以获取详细信息。当执行器死亡或被集群管理器杀死时(通常是因为使用的内存比容器配置的更多),我也看到过类似的错误。

    【讨论】:

      【解决方案4】:

      在 IBM 的 SPSS Modeler 中使用 Pyspark 扩展节点时,我们遇到了同样的问题。上述所有解决方案(以及在 Internet 上可以找到的其他解决方案)均无效。在某些时候,我们发现当我的同事和我在同一台机器上同时执行 Pyspark 扩展节点时,总是会发生这种情况。 这似乎让 Python 工作人员陷入困境或被杀。唯一的解决方案是不要同时执行 Pyspark 的东西......

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2011-11-02
        • 1970-01-01
        • 1970-01-01
        • 2019-04-01
        • 1970-01-01
        • 2012-09-06
        • 2012-10-24
        相关资源
        最近更新 更多