【问题标题】:Pyspark Invalid Input Exception try except errorPyspark Invalid Input Exception try except error
【发布时间】:2015-10-24 03:46:05
【问题描述】:

我正在尝试使用 pyspark 从 s3 读取过去 4 个月的数据并处理数据,但收到以下异常。

org.apache.hadoop.mapred.InvalidInputException:输入模式 s3://path_to_clickstream/date=201508*

每个月的第一天,由于 s3 路径中没有条目(单独的作业处理并将数据上传到 s3 路径,我的作业在该路径之前运行),作业失败。我想知道是否有办法让我捕获此异常并允许作业继续处理所有存在的路径?

【问题讨论】:

    标签: python amazon-s3 exception-handling apache-spark pyspark


    【解决方案1】:

    您可以简单地尝试在加载后触发一个廉价的操作并捕获Py4JJavaError

    from py4j.protocol import Py4JJavaError
    
    def try_load(path):
        rdd = sc.textFile(path)
        try:
            rdd.first()
            return rdd
        except Py4JJavaError as e:
            return sc.emptyRDD()
    
    rdd = try_load(s3_path)
    if not rdd.isEmpty():
        run_the_rest_of_your_code(rdd)
    

    编辑

    如果你想处理多条路径,你可以分别处理每一条并合并结果:

    paths = [
        "s3://path_to_inputdir/month1*/",
        "s3://path_to_inputdir/month2*/",
        "s3://path_to_inpu‌​tdir/month3*/"]
    
    rdds = sc.union([try_load(path) for path in paths])
    

    如果您想要更好的控制,您可以list content 并加载已知文件。

    如果这些路径中至少有一个是非空的,您应该能够使事情变得更简单,并像这样使用 glob:

    sc.textFile("s3://path_to_inputdir/month[1-3]*/")
    

    【讨论】:

    • 我实际上正在尝试做类似"s3://path_to_inputdir/month1*/,s3://path_to_inputdir/month2*/,s3://path_to_inputdir/month3*/" 的事情,所以在这种情况下,如果s3://path_to_inputdir/month3*/ 抛出异常,我应该如何处理?基本上我将多个 s3 路径传递给 sc.textFile() 命令,其中一个路径引发异常?
    • 当然。我又做了一项修改,让联合变得更加简单。
    • 触发一个廉价的动作来捕获异常 - 帮助了我另一个类似的场景
    • @anonuser0428 会考虑接受答案吗?谢谢。
    • from py4j.java_gateway import Py4JJavaError 出现导入错误。路径是正确的等等。当我解压缩此文件 (/usr/local/Cellar/apache-spark/1.6.2/python/lib/py4j-0.9-src.zip) 并检查 java_gateway.py 时,没有 Py4JJavaError。知道我做错了什么吗?
    猜你喜欢
    • 1970-01-01
    • 2023-02-06
    • 2019-06-26
    • 1970-01-01
    • 1970-01-01
    • 2017-05-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多