【问题标题】:How to use external (custom) package in pyspark?如何在 pyspark 中使用外部(自定义)包?
【发布时间】:2018-04-16 12:23:40
【问题描述】:

我正在尝试复制这里给出的灵魂https://www.cloudera.com/documentation/enterprise/5-7-x/topics/spark_python.html 在 pypspark 中导入外部包。但它失败了。

我的代码:

spark_distro.py

from pyspark import SparkContext, SparkConf

def import_my_special_package(x):
    from external_package import external
    return external.fun(x)

conf = SparkConf()
sc = SparkContext()
int_rdd = sc.parallelize([1, 2, 3, 4])
int_rdd.map(lambda x: import_my_special_package(x)).collect()

external_package.py

class external:

    def __init__(self,in):
        self.in = in

    def fun(self,in):
        return self.in*3

spark 提交命令:

spark-submit \
   --master yarn \
  /path to script/spark_distro.py  \
  --py-files /path to script/external_package.py \
  1000

实际错误:

Actual:
  vs = list(itertools.islice(iterator, batch))
  File "/home/gsurapur/pyspark_examples/spark_distro.py", line 13, in <lambda>
  File "/home/gsurapur/pyspark_examples/spark_distro.py", line 6, in import_my_special_package
ImportError: No module named external_package

预期输出:

[3,6,9,12]

我也尝试了sc.addPyFile 选项,但同样的问题失败了。

【问题讨论】:

    标签: apache-spark pyspark hadoop-yarn


    【解决方案1】:

    我知道,事后看来,这听起来很愚蠢,但 spark-submit 的参数顺序通常不可互换:所有与 Spark 相关的参数,包括 --py-file,都必须在 之前 要执行的脚本:

    # your case:
    spark-submit --master yarn-client /home/ctsats/scripts/SO/spark_distro.py --py-files /home/ctsats/scripts/SO/external_package.py
    [...]
    ImportError: No module named external_package
    
    # correct usage:
    spark-submit --master yarn-client --py-files /home/ctsats/scripts/SO/external_package.py /home/ctsats/scripts/SO/spark_distro.py
    [...]
    [3, 6, 9, 12]
    

    测试你的脚本修改如下:

    spark_distro.py

    from pyspark import SparkContext, SparkConf
    
    def import_my_special_package(x):
        from external_package import external
        return external(x)
    
    conf = SparkConf()
    sc = SparkContext()
    int_rdd = sc.parallelize([1, 2, 3, 4])
    print int_rdd.map(lambda x: import_my_special_package(x)).collect()
    

    external_package.py

    def external(x):
         return x*3
    

    修改可以说不会改变问题的本质......

    【讨论】:

    • 是的。你说的对。论点的顺序是问题。我正要在这里发布,然后我看到了你的答案。谢谢@desertnaut
    【解决方案2】:

    addPyFile的情况如下:

    spark_distro2.py

    from pyspark import SparkContext, SparkConf
    
    def import_my_special_package(x):
        from external_package import external
        return external(x)
    
    conf = SparkConf()
    sc = SparkContext()
    sc.addPyFile("/home/ctsats/scripts/SO/external_package.py") # added
    int_rdd = sc.parallelize([1, 2, 3, 4])
    print int_rdd.map(lambda x: import_my_special_package(x)).collect()
    

    测试:

    spark-submit --master yarn-client /home/ctsats/scripts/SO/spark_distro2.py
    [...]
    [3, 6, 9, 12]
    

    【讨论】:

      猜你喜欢
      • 2019-07-14
      • 2021-11-28
      • 2020-04-05
      • 2015-09-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-12-14
      相关资源
      最近更新 更多