【问题标题】:Dataflow Job GCS to Pub/sub Maximum batch size数据流作业 GCS 到 Pub/sub 最大批量大小
【发布时间】:2020-12-15 01:34:27
【问题描述】:

我正在使用默认数据流模板 GCS 来发布/订阅。云存储中的输入文件,大小为 300MB,每个文件有 2-3 百万行。

启动数据流批处理作业时,会引发以下错误

来自工作人员的错误消息:javax.naming.SizeLimitExceededException:Pub/Sub 消息大小 (1089680070) 超过了最大批处理大小 (7500000) org.apache.beam.sdk.io.gcp.pubsub.PubsubIO$Write$PubsubBoundedWriter.processElement(PubsubIO.java:1160)

来自文档:Pub/Sub 一个批次最多接受 1,000 条消息,并且一个批次的大小不能超过 10 兆字节。

这是否意味着我必须将输入文件拆分为 10MB 块或 1000 条消息才能发布?

将如此大的文件(每个 300MB)加载到 pubsub 的推荐方法是什么?

提前感谢您的帮助。

【问题讨论】:

  • 您是否尝试过创建自定义模板并将“MaxBatchBytesSize”增加到更大的值?

标签: google-cloud-platform google-cloud-storage google-cloud-pubsub google-dataflow


【解决方案1】:

这是 Dataflow 端的一个已知限制,此时存在一个feature request 以增加批量大小的大小。使用 +1 按钮并为问题加注星标以跟踪它的进展。

我建议您查看此post,其中建议了解决方法。重要的是要考虑到此解决方法意味着修改 Cloud Storage Text to Pub/Sub 模板以实现其中提到的自定义转换。

另一方面,您可以尝试创建云功能来拆分您的文件,然后由 Dataflow 处理,我想是这样的:

  1. 创建一个“暂存”存储桶来上传您的大文件。
  2. 写一个Cloud Function 来拆分您的文件并将小块写入另一个存储桶。您可以尝试使用filesplit Python 包来执行此操作。
  3. 每次使用Google Cloud Storage Triggers在“暂存”存储桶中上传新文件时,触发云函数运行。
  4. 将文件拆分成小块后,使用相同的 Cloud Function 从“暂存”存储桶中删除大文件以避免额外费用。
  5. 使用 Dataflow 模板 Cloud Storage Text to Pub/Sub 处理第二个存储桶的小块。

【讨论】:

    猜你喜欢
    • 2021-03-21
    • 2018-12-02
    • 2021-05-31
    • 2019-07-11
    • 2019-07-03
    • 1970-01-01
    • 1970-01-01
    • 2021-10-09
    • 1970-01-01
    相关资源
    最近更新 更多