【问题标题】:Firehose JSON -> S3 Parquet -> ETL Spark, error: Unable to infer schema for ParquetFirehose JSON -> S3 Parquet -> ETL Spark,错误:无法推断 Parquet 的架构
【发布时间】:2018-12-05 14:31:40
【问题描述】:

看起来这应该很容易,就像它是这组功能的核心用例一样,但问题接踵而至。

最新的是尝试通过 Glue Dev 端点(PySpark 和 Scala 端点)运行命令。

按照此处的说明进行操作:https://docs.aws.amazon.com/glue/latest/dg/dev-endpoint-tutorial-repl.html

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
glueContext = GlueContext(SparkContext.getOrCreate())
df = glueContext.create_dynamic_frame.from_catalog(database="mydb", table_name="mytable")

产生这个错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/mnt/tmp/spark-0ba544c9-0b5a-42bb-8a54-9b3138205179/userFiles-95708c09-59f6-4b6d-9016-d0375d511b7a/PyGlue.zip/awsglue/dynamicframe.py", line 557, in from_catalog
  File "/mnt/tmp/spark-0ba544c9-0b5a-42bb-8a54-9b3138205179/userFiles-95708c09-59f6-4b6d-9016-d0375d511b7a/PyGlue.zip/awsglue/context.py", line 136, in create_dynamic_frame_from_catalog
  File "/mnt/tmp/spark-0ba544c9-0b5a-42bb-8a54-9b3138205179/userFiles-95708c09-59f6-4b6d-9016-d0375d511b7a/PyGlue.zip/awsglue/data_source.py", line 36, in getFrame
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Unable to infer schema for Parquet. It must be specified manually.;'

它还会在设置行之一中生成此警告:

18/06/26 19:09:35 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.

整体设置非常简单:我们有一个传入的 Kinesis 数据流,该流的处理器生成 JSON Kinesis 数据流,配置为将该 JSON 流写入 S3 中的 Parquet 文件的 Kinesis firehose 流,然后需要 Glue 目录配置才能做到这一点。

Athena 可以正常查看数据,但 Scala/PySpark 脚本出错。

有什么想法/建议吗?

【问题讨论】:

    标签: apache-spark pyspark parquet amazon-kinesis aws-glue


    【解决方案1】:

    好的,仍然不清楚为什么会发生这种情况,但是,找到了解决方法!

    基本上,而不是使用生成的代码:

    val datasource0 = glueContext.getCatalogSource(
            database = "db",
            tableName = "myTable",
            redshiftTmpDir = "",
            transformationContext = "datasource0"
        ).getDynamicFrame()
    

    使用此代码:

    val crawledData = glueContext.getSourceWithFormat(
            connectionType = "s3",
            options = JsonOptions(s"""{"path": "s3://mybucket/mytable/*/*/*/*/"}"""),
            format = "parquet",
            transformationContext = "source"
        ).getDynamicFrame()
    

    这里的关键位似乎是 */*/*/*/ - 如果我只指定根文件夹,我会收到 Parquet 错误,并且(显然)普通的 /**/* 通配符不起作用。

    【讨论】:

    • 我假设这只是忽略了粘合目录表,对吗?知道它是否支持文件排除吗?
    • 是的,全部忽略。我们实际上是在移动使用直接火花(仅在 Glue 作业中),而不是通过 Glue 上下文。不知道它是否支持排除文件。
    • 感谢您的回答!对于长期工作到期的临时存储桶,您是否遇到过 S3 令牌问题?如果是这样,我可以创建一个新的 SO 问题供您回答..
    • 我们没有使用临时存储桶,或者至少没有过期的存储桶。保留所有中间产品以用于调试和恢复目的实际上非常有用。整个事情不必一次成功!
    猜你喜欢
    • 2017-12-10
    • 2023-04-02
    • 2019-04-06
    • 2019-11-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-03-23
    相关资源
    最近更新 更多