【问题标题】:Reading a gzipped file using apache beam, wrapped in TextIOWrapper results in "'CompressedFile' object has no attribute 'writable'"使用 apache beam 读取 gzip 文件,包裹在 TextIOWrapper 中会导致“'CompressedFile'对象没有属性'writable'”
【发布时间】:2019-10-22 16:39:32
【问题描述】:

我正在努力在 apache Beam 中实现一个简单的 CSV 阅读器,以及来自 Beam 存储库的测试:https://github.com/apache/beam/blob/b85795adbd22d8b5cf9ebc684ce43e172a789587/sdks/python/apache_beam/io/fileio_test.py#L128-L148

def get_csv_reader(readable_file):
  import sys
  import csv
  import io
  if sys.version_info >= (3, 0):
    return csv.reader(io.TextIOWrapper(readable_file.open()))
  else:
    return csv.reader(readable_file.open())

with beam.Pipeline() as p:
  content_pc = (p
                | beam.Create([CSV_FILE])
                | fileio.ReadMatches()
                | beam.FlatMap(get_csv_reader)
                | beam.Map(print))

如果 CSV_FILE 未压缩并且我没有收到任何错误,则此方法可以正常工作。但是,如果我使用压缩文件作为输入,我会得到:

<ipython-input-114-4830c3592163> in get_csv_reader(readable_file)
      6   import io
      7   if sys.version_info >= (3, 0):
----> 8     return csv.reader(io.TextIOWrapper(readable_file.open()))
      9   else:
     10     return csv.reader(readable_file.open())

AttributeError: 'CompressedFile' object has no attribute 'writable' [while running 'FlatMap(get_csv_reader)']

我明白为什么会发生这种情况(TextIOWrapper 正在寻找一个可读和可写的对象)。是否有对 apache 光束/数据流有更深入了解的人可以建议如何最好地实现它来处理压缩和未压缩的输入?

【问题讨论】:

  • 我试过你的代码 sn-p,是的。有趣的问题。虽然我觉得根本原因与梁无关?将它放在正确的标签下将有助于更快地找到正确的答案。
  • 啊。我懂了。你说的对。我是 python API 的新手,没有意识到 ReadableFile 来自梁。
  • 这并不理想,但ReadableFile 包含文件路径作为名为@9​​87654326@ 的属性。您可以使用它来打开文件:filesystems.Filesystems.open(rf.metadata.path, compression=MY_COMPRESSION)。您可能仍需要使用 TextIOWrapper。 LMK 如果有帮助的话....
  • 不用担心。我刚刚写了一个修复:github.com/apache/beam/pull/9861 - 它应该在 Beam 2.18.0 中可用
  • 添加提示作为答案。

标签: python google-cloud-dataflow apache-beam


【解决方案1】:

从 Beam 的 2.18.0 版本开始,您将能够执行以下操作:

def get_csv_reader(readable_file):
  import sys
  import csv
  import io
  if sys.version_info >= (3, 0):
    return csv.reader(io.TextIOWrapper(readable_file.open(compression_type=MY_COMPRESSION)))
  else:
    return csv.reader(readable_file.open(compression_type=MY_COMPRESSION))

with beam.Pipeline() as p:
  content_pc = (p
                | beam.Create([CSV_FILE])
                | fileio.ReadMatches()
                | beam.FlatMap(get_csv_reader)
                | beam.Map(print))

【讨论】:

  • 这在我的初始测试中非常适合我。尽管现在有警告,因为文件是在返回阅读器的函数中打开的,但我无法弄清楚如何在正确的时间正确关闭它。 ResourceWarning: unclosed file &lt;_io.TextIOWrapper name='...'&gt;这是暂时可以忽略的吗?
【解决方案2】:

即使在我将 Beam 版本更新到 2.18.0 或 2.19.0 之后,Pablo (https://stackoverflow.com/a/58529353/3828305) 发布的答案也不起作用。

复制失败。

GZ_FILE_PATTERN='/local/path/to/some-wildcard-*.gz'

def get_csv_reader(readable_file):
    return csv.reader(io.TextIOWrapper(readable_file.open(compression_type=CompressionTypes.GZIP)))


with beam.Pipeline() as p:
    process = \
        (p
         | MatchFiles(GZ_FILE_PATTERN)
         | ReadMatches()
         | beam.FlatMap(get_csv_reader)
         | beam.Map(print)
         )

错误信息:

AttributeError: 'CompressedFile' 对象没有属性 'writable' [在运行 'FlatMap(get_csv_reader)' 时]

(尚未合并补丁且未修复 v2.19.0 的问题?)

目前我决定避免使用CompressionTypes.GZIP,自己解压文件。

重现以正常工作。

GZ_FILE_PATTERN='/local/path/to/some-wildcard-*.gz'

def get_csv_reader(readable_file_metadata):
    # Hack: Pass UNCOMPRESSED to BEAM
    with FileSystems.open(readable_file_metadata.path, compression_type=CompressionTypes.UNCOMPRESSED) as fopen:
        # decompress by myself
        decompressed_str = io.StringIO(gzip.decompress(fopen.read()).decode('utf-8'))
        return csv.reader(decompressed_str)


with beam.Pipeline() as p:
    process = \
        (p
         | MatchFiles(GZ_FILE_PATTERN)
         | beam.FlatMap(get_csv_reader)
         | beam.Map(print)
         )

【讨论】:

    【解决方案3】:

    正如gecko655 所述,接受的答案可能会导致错误。我们可以通过自己解压缩文件来避免建议的解决方法。

    def csv_reader(fn: str) -> List[str]:
        fp = GcsIO().open(fn)
        for r in gzip.open(fp):
            s = r.decode('utf-8')
            yield s.strip('\n').split(',')
    
    
    def parse_csv(fn: str):
        csv = csv_reader(fn)
        # todo: manipulate csv and return iterable
    
    
    result = (p | beam.Create([known_args.input]) | beam.FlatMap(parse_csv))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-12-29
      相关资源
      最近更新 更多