【问题标题】:Received message larger than max on a batch processing pipeline在批处理管道上接收到大于最大值的消息
【发布时间】:2016-11-17 20:26:05
【问题描述】:

我在每天都在谷歌云数据流服务上运行的批处理管道上收到此消息。它已开始失败并显示以下消息:

(88b342a0e3852af3): java.io.IOException: INVALID_ARGUMENT: Received message larger than max (21824326 vs. 4194304) 
dataflow-batch-jetty-11171129-7ea5-harness-waia talking to localhost:12346 at
com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleWriter.close(Native Method) at 
com.google.cloud.dataflow.sdk.runners.worker.ChunkingShuffleEntryWriter.close(ChunkingShuffleEntryWriter.java:67) at 
com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.close(ShuffleSink.java:286) at 
com.google.cloud.dataflow.sdk.util.common.worker.WriteOperation.finish(WriteOperation.java:100) at 
com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:264) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:197) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:149) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:192) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at 
java.lang.Thread.run(Thread.java:745)

我仍在使用旧的解决方法来输出带有标题的 CSV 文件,例如

PCollection<String> output = data.apply(ParDo.of(new DoFn<String, String>() {
    String new_line = System.getProperty("line.separator");
    String csv_header = "id, stuff_1, stuff_2" + new_line;
    StringBuilder csv_body = new StringBuilder().append(csv_header);

    @Override
    public void processElement(ProcessContext c) {
        csv_body.append(c.element()).append(newline);
    }

    @Override
    public void finishBundle(Context c) throws Exception {
        c.output(csv_body.toString());
    }

})).apply(TextIO.Write.named("WriteData").to(options.getOutput()));

这是什么原因造成的?现在这个DoFn的输出是不是太大了?正在处理的数据集的大小没有增加。

【问题讨论】:

    标签: google-cloud-dataflow apache-beam


    【解决方案1】:

    这看起来可能是我们这边的一个错误,我们正在调查它,但总的来说,代码可能没有按照您的意愿执行。

    正如所写,您最终会得到未指定数量的输出文件,其名称以给定前缀开头,每个文件都包含不同数据块的预期 CSV 类输出(包括标题)的串联,以未指定的顺序。

    为了正确实现对 CSV 文件的写入,只需使用 TextIO.Write.withHeader() 指定标题,并完全删除您的 CSV 构造 ParDo。这也不会触发错误。

    【讨论】:

    • 它在 Google Cloud Dataflow Java SDK 1.5.0 上运行。我也知道新的TextIO.Write.withHeader(),但我还没有更新 SDK 或代码。我只是想知道为什么它现在开始失败了。
    • 在 SDK 版本 1.7.0 和 1.8.0 上成功运行。现在我正在查看 pom.xml 文件中已经包含 1.7.0 的代码。而且我不确定它是否会在 1.5.0 上运行,因为我已经更新了代码以使用新的 v1 数据存储 API 运行。
    • 关于它现在开始失败的原因:我们正在逐步推出 shuffle 的性能改进,而您的管道在改进版本中遇到了错误。我们正在努力修复。所以,我很困惑:使用 SDK 1.7.0/1.8.0 和 TextIO.Write.withHeader() 是否完全解决了您的问题?
    • 已通过仅切换到 1.7.0/1.8.0 解决。我还没有更新要使用 withHeader 的代码。
    • 我查看了卡住的工作。看起来您的类路径中同时有多个 SDK 版本(1.5.0、1.5.1、1.6.0、1.7.0、1.8.0),这会导致各种问题,因为类路径顺序是非确定性的。请确保您一次只有一个版本。
    猜你喜欢
    • 2021-12-07
    • 2019-08-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多