【问题标题】:How to resolve pickle error in pyspark?如何解决pyspark中的pickle错误?
【发布时间】:2015-11-08 15:59:09
【问题描述】:

我正在遍历文件以收集有关字典中列和行中的值的信息。我有以下在本地工作的代码:

def search_nulls(file_name):
    separator = ','
    nulls_dict = {}
    fp = open(file_name,'r')
    null_cols = {}
    lines = fp.readlines()

    for n,line in enumerate(lines):
        line = line.split(separator)
        for m,data in enumerate(line):
            data = data.strip('\n').strip('\r')
            if str(m) not in null_cols:
                null_cols[str(m)] = defaultdict(lambda: 0)
            if len(data) <= 4:
                null_cols[str(m)][str(data)] = null_cols[str(m)][str(data)] + 1

    return null_cols


files_to_process = ['tempfile.csv']
results = map(lambda file: search_nulls(file), files_to_process)

上面的代码在没有火花的情况下可以正常工作。 我评论了上面的最后两行,并尝试使用 spark,因为这是需要运行分布式的东西的原型:

os.environ['SPARK_HOME'] = <path_to_spark_folder>
conf = SparkConf().setAppName("search_files").setMaster('local')

sc = SparkContext(conf=conf)

objects = sc.parallelize(files_to_process)
resulting_object = \
    objects.map(lambda file_object: find_nulls(file_object))

result = resulting_object.collect()

但是,当使用 spark 时,这会导致以下错误:

File "<path-to-spark>/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "<path-to-spark>/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "<path-to-spark>/python/lib/pyspark.zip/pyspark/serializers.py", line 267, in dump_stream
    bytes = self.serializer.dumps(vs)
  File "<path-to-spark>/python/lib/pyspark.zip/pyspark/serializers.py", line 415, in dumps
    return pickle.dumps(obj, protocol)
TypeError: expected string or Unicode object, NoneType found​

我找不到任何明显的失败原因,因为它在本地完美运行,而且我没有在工作节点之间共享任何文件。事实上,无论如何我只是在我的本地机器上运行它。

有谁知道这可能失败的充分理由吗?

【问题讨论】:

    标签: python dictionary unicode apache-spark pyspark


    【解决方案1】:

    您的问题的根源是以下一行:

    null_cols[str(m)] = defaultdict(lambda: 0)
    

    您可以在the pickle module documentationWhat can be pickled and unpickled? 部分阅读:

    可以腌制以下几种:

    • ...
    • 在模块顶层定义的函数(使用 def,而不是 lambda)
    • 在模块顶层定义的内置函数
    • ...

    应该清楚lambda: 0 不符合上述条件。为了使其工作,您可以例如将 lambda 表达式替换为 int:

    null_cols[str(m)] = defaultdict(int)
    

    我们如何将 lambda 表达式传递给 PySpark 中的高阶函数?魔鬼在细节中。 PySpark 根据上下文使用不同的序列化程序。为了序列化闭包,包括 lambda 表达式,它使用支持 lambda 表达式和嵌套函数的自定义 cloudpickle。为了处理数据,它使用默认的 Python 工具。


    一些旁注:

    • 我不会使用 Python file 对象来读取数据。它不可移植,并且不能在本地文件系统之外工作。您可以改用SparkContex.wholeTextFiles
    • 如果您确实关闭了连接。使用with 语句通常是最好的方法
    • 您可以在拆分行之前安全地去除换行符

    【讨论】:

    • 所以,澄清一下,一般来说,一个可以在本地序列化的lambda函数应该可以被pyspark序列化?了解这一点对于在本地进行测试很有用。感谢您对这个问题的坚持。
    • 大多数时候是的。你必须考虑事情发生的时间和地点,一般来说我不会过度使用 lambdas。几乎所有常见的操作都可以使用内置函数执行,如果没有静态类型,则容易出错,本质上不可测试,而且非常冗长。
    猜你喜欢
    • 2020-09-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-02-04
    • 2018-11-25
    • 2017-10-06
    • 2020-11-01
    相关资源
    最近更新 更多