【发布时间】:2019-12-17 17:53:23
【问题描述】:
我有一个数据流工作:
- 从 GCS 读取包含其他文件名的文本文件
- 将文件名传递给 ReadAllFromParquet 以读取 .parquet 文件
- 写入 BigQuery
尽管我的工作“成功”,但它基本上没有超过 ReadAllFromParquet 步骤的输出集合。
我成功读取了列表中的文件,例如:['gs://my_bucket/my_file1.snappy.parquet','gs://my_bucket/my_file2.snappy.parquet','gs://my_bucket/my_file3.snappy.parquet']
我也在确认这个列表是正确的,并且文件的 GCS 路径是正确的,在 ReadAllFromParquet 之前的步骤中使用记录器。
这就是我的管道的样子(为简洁起见,省略了完整的代码,但我相信它通常可以正常工作,因为我使用 ReadAllFromText 为 .csv 提供了完全相同的管道并且工作正常):
with beam.Pipeline(options=pipeline_options_batch) as pipeline_2:
try:
final_data = (
pipeline_2
|'Create empty PCollection' >> beam.Create([None])
|'Get accepted batch file: {}'.format(runtime_options.complete_batch) >> beam.ParDo(OutputValueProviderFn(runtime_options.complete_batch))
|'Read all filenames into a list'>> beam.ParDo(FileIterator(runtime_options.files_bucket))
|'Read all files' >> beam.io.ReadAllFromParquet(columns=['locationItemId','deviceId','timestamp'])
|'Process all files' >> beam.ParDo(ProcessSch2())
|'Transform to rows' >> beam.ParDo(BlisDictSch2())
|'Write to BigQuery' >> beam.io.WriteToBigQuery(
table = runtime_options.comp_table,
schema = SCHEMA_2,
project = pipeline_options_batch.view_as(GoogleCloudOptions).project, #options.display_data()['project'],
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED, #'CREATE_IF_NEEDED',#create if does not exist.
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND #'WRITE_APPEND' #add to existing rows,partitoning
)
)
except Exception as exception:
logging.error(exception)
pass
这就是我的工作图之后的样子:
是否有人知道这里可能出了什么问题以及最好的调试方法是什么? 我目前的想法:
存储桶权限问题。我注意到我正在读取的存储桶很奇怪,因为尽管我是项目所有者,但我无法下载文件。项目的所有者只有“Storage Legacy Bucket Owner”。我添加了“存储管理员”,然后在使用我自己的帐户手动下载文件时工作正常。根据数据流文档,我确保默认计算服务帐户和数据流帐户在此存储桶上都有“存储管理员”。但是,也许这只是一个红鲱鱼,因为最终如果存在权限问题我应该在日志中看到这个并且作业会失败?
ReadAllFromParquet 需要不同格式的文件模式?我已经展示了一个列表示例(在我上面的图表中,我可以看到输入集合正确地显示了添加的元素 = 列表中 48 个文件的 48 个元素)我在上面提供。我知道这种格式适用于 ReadAllFromText,所以我认为它们是等效的并且应该可以工作。
=========
编辑: 注意到其他潜在的后果。与我使用 ReadAllFromText 并且工作正常的其他工作相比,我注意到命名略有不匹配,这令人担忧。
特别注意
Read all files/ReadAllFiles/ReadRange.out0
对
Read all files/Read all files/ReadRange.out0
路径的第一部分是我在这两个作业中的步骤名称。 但我相信第二个是来自 apache_beam.io.filebasedsource (https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filebasedsource.py) 的 ReadAllFiles 类,ReadAllFromText 和 ReadAllFromParquet 都调用它。
似乎是一个潜在的错误,但似乎无法在源代码中追踪它。
============== 编辑 2
经过更多挖掘后,ReadAllFromParquet 似乎还不能正常工作。 ReadFromParquet 调用 apache_beam.io.parquetio._ParquetSource 而 ReadAllFromParquet 只是调用
apache_beam.io.filebasedsource._ReadRange.
我想知道如果它是一个实验功能,是否有办法打开它?
【问题讨论】:
标签: google-cloud-platform google-cloud-dataflow apache-beam google-cloud-iam