【问题标题】:Write-streaming to Google Cloud Storage in Python使用 Python 向 Google Cloud Storage 写入流式传输
【发布时间】:2019-04-03 18:27:18
【问题描述】:

我正在尝试将用Python 编写的AWS Lambda 函数迁移到 CF

  1. 即时解压缩并逐行读取
  2. 对每一行执行一些光变换
  3. 将未压缩的输出(一次一行或多个块)写入 GCS

输出大于 2GB - 但略小于 3GB,因此它适合 Lambda正好

嗯,这似乎是不可能的,或者更多地涉及GCP

  • 未压缩的文件无法放入内存或/tmp - 在撰写本文时限制为 2048MB - 因此无法使用 Python 客户端库 upload_from_file(或 _filename
  • this 官方文件,但令我惊讶的是,它指的是boto,一个最初为AWS S3 设计的库,并且由于boto3 已经存在一段时间了,所以它已经过时了。没有真正的GCP 方法来流式写入或读取
  • Node.js 有一个简单的 createWriteStream() - 不错的文章 here 顺便说一句 - 但在 Python 中没有等效的单行代码
  • Resumable media upload 听起来很像,但是在 Node 中处理的很多代码更容易
  • AppEngine 有 cloudstorage,但在它之外不可用 - 并且已过时
  • 在工作包装器上几乎没有示例,用于逐行写入文本/纯数据,就好像GCS 是本地文件系统一样。这不仅限于Cloud Functions 和 Python 客户端库的缺失功能,但由于资源限制,它在 CF 中更为严重。顺便说一句,我是 discussion 的一部分,添加了一个可写的 IOBase 函数,但它没有任何吸引力。
  • 显然使用虚拟机或DataFlow 对手头的任务来说是不可能的。

在我看来,从基于云的存储中读取/写入的流(或类似流)甚至应该包含在 Python 标准库中。

按照当时的建议,您仍然可以使用GCSFS,它会在您将内容写入 FileObj 时在后台为您分块提交上传。 同一个团队写了s3fs。我不知道 Azure。

AFAIC,我会坚持使用AWS Lambda,因为输出可以容纳在内存中 - 目前 - 但分段上传是支持任何输出大小且内存最少的方法。

想法或替代方案?

【问题讨论】:

  • upload_from_file 使用类似文件的对象,所以也许您可以使用生成器来完成您想要的工作?
  • 不幸的是,它要求文件处理程序以只读模式打开,而不是混合(读/写)。换句话说,该文件必须已经完整存在。目标是读取(写入 GCS/S3)作为写入内存中的处理程序。

标签: python aws-lambda google-cloud-storage google-cloud-functions azure-storage


【解决方案1】:

我对 multipartresumable 上传感到困惑。后者是您“流式传输”所需要的——它实际上更像是上传缓冲流的块。

Multipart 上传是在同一个 API 调用中一次加载数据和自定义元数据。

虽然我非常喜欢 GCSFS - Martin,但他的主要贡献者非常敏感 - 我最近发现 an alternative 使用了 google-resumable-media 库。

GCSFS 建立在核心 http API 之上,而 Seth 的解决方案使用由 Google 维护的低级库,与 API 更改更加同步,其中包括指数备份。后者对于大/长流来说确实是必须的,因为连接可能会中断,即使在 GCP 内 - 我们遇到了 GCF 的问题。

最后,我仍然相信Google Cloud Library 是添加类似流的功能的正确位置,基本的writeread。它有core code already

如果您也对核心库中的该功能感兴趣,请点赞here - 假设优先级基于此。

【讨论】:

    【解决方案2】:

    smart_open 现在支持 GCS,还支持动态解压。

    import lzma
    from smart_open import open, register_compressor
    
    def _handle_xz(file_obj, mode):
        return lzma.LZMAFile(filename=file_obj, mode=mode, format=lzma.FORMAT_XZ)
    
    register_compressor('.xz', _handle_xz)
    
    # stream from GCS
    with open('gs://my_bucket/my_file.txt.xz') as fin:
        for line in fin:
            print(line)
    
    # stream content *into* GCS (write mode):
    with open('gs://my_bucket/my_file.txt.xz', 'wb') as fout:
        fout.write(b'hello world')
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-02-14
      • 1970-01-01
      • 1970-01-01
      • 2020-07-09
      • 1970-01-01
      • 2021-11-05
      • 2023-04-08
      相关资源
      最近更新 更多