【发布时间】:2019-10-22 16:39:32
【问题描述】:
我正在努力在 apache Beam 中实现一个简单的 CSV 阅读器,以及来自 Beam 存储库的测试:https://github.com/apache/beam/blob/b85795adbd22d8b5cf9ebc684ce43e172a789587/sdks/python/apache_beam/io/fileio_test.py#L128-L148
def get_csv_reader(readable_file):
import sys
import csv
import io
if sys.version_info >= (3, 0):
return csv.reader(io.TextIOWrapper(readable_file.open()))
else:
return csv.reader(readable_file.open())
with beam.Pipeline() as p:
content_pc = (p
| beam.Create([CSV_FILE])
| fileio.ReadMatches()
| beam.FlatMap(get_csv_reader)
| beam.Map(print))
如果 CSV_FILE 未压缩并且我没有收到任何错误,则此方法可以正常工作。但是,如果我使用压缩文件作为输入,我会得到:
<ipython-input-114-4830c3592163> in get_csv_reader(readable_file)
6 import io
7 if sys.version_info >= (3, 0):
----> 8 return csv.reader(io.TextIOWrapper(readable_file.open()))
9 else:
10 return csv.reader(readable_file.open())
AttributeError: 'CompressedFile' object has no attribute 'writable' [while running 'FlatMap(get_csv_reader)']
我明白为什么会发生这种情况(TextIOWrapper 正在寻找一个可读和可写的对象)。是否有对 apache 光束/数据流有更深入了解的人可以建议如何最好地实现它来处理压缩和未压缩的输入?
【问题讨论】:
-
我试过你的代码 sn-p,是的。有趣的问题。虽然我觉得根本原因与梁无关?将它放在正确的标签下将有助于更快地找到正确的答案。
-
啊。我懂了。你说的对。我是 python API 的新手,没有意识到 ReadableFile 来自梁。
-
这并不理想,但
ReadableFile包含文件路径作为名为@987654326@ 的属性。您可以使用它来打开文件:filesystems.Filesystems.open(rf.metadata.path, compression=MY_COMPRESSION)。您可能仍需要使用 TextIOWrapper。 LMK 如果有帮助的话.... -
不用担心。我刚刚写了一个修复:github.com/apache/beam/pull/9861 - 它应该在 Beam 2.18.0 中可用
-
添加提示作为答案。
标签: python google-cloud-dataflow apache-beam