【发布时间】:2019-01-16 07:45:36
【问题描述】:
简而言之:我在 AWS 的 EMR 上运行一个 pySpark 应用程序。当我使用驻留在外部包中的外部模块中的自定义函数映射 rdd 时(在 .zip 文件中作为 --py-files 运送),集群卡住了 - 运行状态保持不变,直到不再出现日志行我手动终止它。
它不是什么: 这不是一个正确的导入异常 - 因为这会在执行导入行时终止应用程序,引发适当的异常,这不会发生。此外,如下所示,当被调用的函数驻留在“有问题的”模块中时,调用与 lambda 类似的函数映射的函数 - 有效。
它是什么:只有当程序试图在主程序中编写的转换中使用该模块中的函数作为映射函数时,才会出现错误。此外,如果我删除在外部文件(“有问题”模块)中突出显示的导入行 - 在这个最小的错误再现上下文中从未使用过的导入(但在实际使用的上下文中) - 错误停止存在。
以下是该错误的最小示例的代码,包括对 2 条重要行的注释和一些技术信息。任何帮助将不胜感激。
这里是主程序:
import spark_context_holder
from reproducing_bugs_external_package import reproducing_bugs_external_file
sc = spark_context_holder.sc
log = spark_context_holder.log
def make_nums_rdd():
return sc.parallelize([1, 2, 3] * 300).map(lambda x: x * x / 1.45)
log.warn("Starting my code!")
sum = sc.parallelize([1,2,3]*300).map(lambda x: x*x/1.45).sum()
log.warn("The calculated sum using in-line expression, which doesn't mean anything more than 'succeeded in carrying out the calculation on the cluster', is {}!".format(sum))
simple_sum_rdd = make_nums_rdd()
log.warn("The calculated sum using the in-file function, which doesn't mean anything more than 'succeeded in carrying out the calculation on the cluster', is {}!".format(simple_sum_rdd.sum()))
simple_sum_rdd = reproducing_bugs_external_file.make_nums_rdd(sc)
log.warn("The calculated sum using the external file's function, which doesn't mean anything more than 'succeeded in carrying out the calculation on the cluster', is {}!".format(simple_sum_rdd.sum()))
simple_sum_rdd = sc.parallelize([1,2,3]*300).map(reproducing_bugs_external_file.calc_func)
log.warn("The calculated sum using the external file's mapping function, which doesn't mean anything more than 'succeeded in carrying out the calculation on the cluster', is {}!".format(simple_sum_rdd.sum()))
# This last line does not get logged, while the others up until this one do. Here the cluster gets stuck on Running status without outputting any more log lines
在作为 --py-files 发布的 zip 文件中,我具有以下结构:
> spark_context_holde.py
> reproducing_bugs_external_package
>> __init__.py
>> reproducing_bugs_external_file.py
下面是它们各自的内容:
spark_context_holder.py
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("kac_walk_experiment")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
log4jLogger = sc._jvm.org.apache.log4j
log = log4jLogger.LogManager.getLogger("dbg_et")
# sc.setLogLevel("ALL")
def getParallelismAlternative():
return int(sc.getConf().get('spark.cores.max'))
__init__.py
from . import reproducing_bugs_external_file
__all__ = [reproducing_bugs_external_file]
reproduce_bugs_external_file.py
import numpy
import spark_context_holder # If this is removed - the bug stops!
def make_nums_rdd(sc):
return sc.parallelize([1, 2, 3] * 300).map(lambda x: x * x / 1.45)
def calc_func(x):
return x*x/1.45
更多技术细节:
- 发布标签:emr-5.17.0
- Hadoop 发行版:Amazon 2.8.4
- 应用程序:Spark 2.3.1
- 使用python3.4,这是迄今为止安装在AWS机器上的第3个版本
【问题讨论】:
标签: python amazon-web-services pyspark amazon-emr