【问题标题】:Beam/Dataflow ReadAllFromParquet doesn't read anything but my job still succeeds?Beam/Dataflow ReadAllFromParquet 没有读取任何内容,但我的工作仍然成功?
【发布时间】:2019-12-17 17:53:23
【问题描述】:

我有一个数据流工作:

  1. 从 GCS 读取包含其他文件名的文本文件
  2. 将文件名传递给 ReadAllFromParquet 以读取 .parquet 文件
  3. 写入 BigQuery

尽管我的工作“成功”,但它基本上没有超过 ReadAllFromParquet 步骤的输出集合。 我成功读取了列表中的文件,例如:['gs://my_bucket/my_file1.snappy.parquet','gs://my_bucket/my_file2.snappy.parquet','gs://my_bucket/my_file3.snappy.parquet'] 我也在确认这个列表是正确的,并且文件的 GCS 路径是正确的,在 ReadAllFromParquet 之前的步骤中使用记录器。

这就是我的管道的样子(为简洁起见,省略了完整的代码,但我相信它通常可以正常工作,因为我使用 ReadAllFromText 为 .csv 提供了完全相同的管道并且工作正常):

 with beam.Pipeline(options=pipeline_options_batch) as pipeline_2:

    try:

        final_data = (
        pipeline_2
        |'Create empty PCollection' >> beam.Create([None])
        |'Get accepted batch file: {}'.format(runtime_options.complete_batch) >> beam.ParDo(OutputValueProviderFn(runtime_options.complete_batch))
        |'Read all filenames into a list'>> beam.ParDo(FileIterator(runtime_options.files_bucket))
        |'Read all files' >> beam.io.ReadAllFromParquet(columns=['locationItemId','deviceId','timestamp'])
        |'Process all files' >> beam.ParDo(ProcessSch2())
        |'Transform to rows' >> beam.ParDo(BlisDictSch2())
        |'Write to BigQuery' >> beam.io.WriteToBigQuery(
                table = runtime_options.comp_table, 
                schema = SCHEMA_2,
                project = pipeline_options_batch.view_as(GoogleCloudOptions).project, #options.display_data()['project'],
                create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,  #'CREATE_IF_NEEDED',#create if does not exist.
                write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND    #'WRITE_APPEND' #add to existing rows,partitoning
                )
        )
    except Exception as exception:
        logging.error(exception)
        pass

这就是我的工作图之后的样子:

是否有人知道这里可能出了什么问题以及最好的调试方法是什么? 我目前的想法:

  1. 存储桶权限问题。我注意到我正在读取的存储桶很奇怪,因为尽管我是项目所有者,但我无法下载文件。项目的所有者只有“Storage Legacy Bucket Owner”。我添加了“存储管理员”,然后在使用我自己的帐户手动下载文件时工作正常。根据数据流文档,我确保默认计算服务帐户和数据流帐户在此存储桶上都有“存储管理员”。但是,也许这只是一个红鲱鱼,因为最终如果存在权限问题我应该在日志中看到这个并且作业会失败?

  2. ReadAllFromParquet 需要不同格式的文件模式?我已经展示了一个列表示例(在我上面的图表中,我可以看到输入集合正确地显示了添加的元素 = 列表中 48 个文件的 48 个元素)我在上面提供。我知道这种格式适用于 ReadAllFromText,所以我认为它们是等效的并且应该可以工作。

=========

编辑: 注意到其他潜在的后果。与我使用 ReadAllFromText 并且工作正常的其他工作相比,我注意到命名略有不匹配,这令人担忧。

这是我工作的输出集合的名称:

这就是我的拼花地板上的名字,实际上并没有读到任何东西:

特别注意

Read all files/ReadAllFiles/ReadRange.out0

Read all files/Read all files/ReadRange.out0

路径的第一部分是我在这两个作业中的步骤名称。 但我相信第二个是来自 apache_beam.io.filebasedsource (https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filebasedsource.py) 的 ReadAllFiles 类,ReadAllFromText 和 ReadAllFromParquet 都调用它。

似乎是一个潜在的错误,但似乎无法在源代码中追踪它。

============== 编辑 2

经过更多挖掘后,ReadAllFromParquet 似乎还不能正常工作。 ReadFromParquet 调用 apache_beam.io.parquetio._ParquetSource 而 ReadAllFromParquet 只是调用
apache_beam.io.filebasedsource._ReadRange.

我想知道如果它是一个实验功能,是否有办法打开它?

【问题讨论】:

    标签: google-cloud-platform google-cloud-dataflow apache-beam google-cloud-iam


    【解决方案1】:

    如果您使用的是最后一个 Beam SDK,您没有提到,请尝试使用 SDK 2.16 测试最后的更改。

    文档指出ReadAllFromParquet 和 ReadFromParquet 一样是一个实验性功能;尽管如此,ReadFromParquet 被报告为在此线程中工作Apache-Beam: Read parquet files from nested HDFS directories,您可能想尝试使用此功能。

    【讨论】:

    • ReadFromParquet 确实有效。 ReadAllFromParquet 目前只指向与 ReadAllFromText 相同的类,无法读取 parquet。您可以通过检查您共享的链接中的源代码来确认这一点。那就是除非2.16有源代码中看不到的变化?
    • master branch 应该包含最后的修改,但您也可以检查其他版本。根据我的评论,ReadAllFromParquet 和 ReadFromParquet 类都调用_ParquetSource,我想知道您是如何在原始问题的注释“EDIT 2”中到达 apache_beam.io.filebasedsource._ReadRange 的。无论如何,由于 ReadFromParquet 正在工作,我建议继续使用该功能。
    • 感谢您的意见。我实际上正在查看托管在 apache.org 上的 pydoc beam.apache.org/releases/pydoc/2.11.0/_modules/apache_beam/io/…,但现在意识到这是 2.11!幸运的是,我的工作目标发生了变化,所以我们不再需要那些镶木地板文件,所以暂时放弃了。如果可以的话,我肯定会回来,好像我使用的 2.15 确实调用了 _ParquetSource 问题出在我的代码中的其他地方。
    • 当然,如果您有新结果回来,我会尽力提供帮助。
    猜你喜欢
    • 2013-07-21
    • 2021-09-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-01-05
    • 2014-01-29
    • 2014-08-20
    相关资源
    最近更新 更多