【问题标题】:Handling out of memory exception when output of processElement is fat当 processElement 的输出很胖时处理内存不足异常
【发布时间】:2016-07-23 03:45:50
【问题描述】:


当我使用数据流时,我需要创建一些字符串行,其中用户 ID 列表用逗号分隔。然后在GCS中写入结果。
不幸的是,在DoFn的processElement期间,每一行都有太多的用户,导致java.lang.OutOfMemoryError。
有什么办法可以避免OutOfMemory异常并成功写入胖行GCS 中的每一行都带有文本文件?
我的源代码如下所示。

PCollection<KV<String, String>> rows = someData
    .apply(Combine.<String, String>perKey(new CombineUserIds()));

public static class CombineUserIds implements SerializableFunction<Iterable<String>, String> {
  private static final long serialVersionUID = 0;

  @Override
  public String apply(Iterable<String> userIdList) {
    return Joiner.on(",").join(userIdList);
  }
}

这里,源代码中的someDataPCollection&lt;KV&lt;String, String&gt;&gt;类型,key是group_id,value是user_id。
以下是完整的错误消息
(b997767fac436e5c): java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3332) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) at java.lang.StringBuilder.append(StringBuilder.java:136) at java.lang.StringBuilder.append(StringBuilder.java:76) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:457) at java.lang.StringBuilder.append(StringBuilder.java:166) at java.lang.StringBuilder.append(StringBuilder.java:76) at com.google.common.base.Joiner.appendTo(Joiner.java:111) at com.google.common.base.Joiner.appendTo(Joiner.java:152) at com.google.common.base.Joiner.join(Joiner.java:193) at com.google.common.base.Joiner.join(Joiner.java:183) at com.moloco.dataflow.ml.adhoc.GenerateMLUserProfileSet$CombineUserIds.apply(GenerateMLUserProfileSet.java:189) at com.moloco.dataflow.ml.adhoc.GenerateMLUserProfileSet$CombineUserIds.apply(GenerateMLUserProfileSet.java:184) at com.google.cloud.dataflow.sdk.transforms.Combine$IterableCombineFn.mergeToSingleton(Combine.java:1613) at com.google.cloud.dataflow.sdk.transforms.Combine$IterableCombineFn.mergeAccumulators(Combine.java:1591) at com.google.cloud.dataflow.sdk.transforms.Combine$IterableCombineFn.mergeAccumulators(Combine.java:1536) at com.google.cloud.dataflow.sdk.transforms.Combine$CombineFn$2.mergeAccumulators(Combine.java:489) at com.google.cloud.dataflow.sdk.runners.worker.GroupAlsoByWindowsParDoFnFactory$MergingKeyedCombineFn.extractOutput(GroupAlsoByWindowsParDoFnFactory.java:249) at com.google.cloud.dataflow.sdk.runners.worker.GroupAlsoByWindowsParDoFnFactory$MergingKeyedCombineFn.extractOutput(GroupAlsoByWindowsParDoFnFactory.java:216) at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsAndCombineDoFn$KeyedCombineFnRunner.extractOutput(GroupAlsoByWindowsAndCombineDoFn.java:243) at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsAndCombineDoFn.closeWindow(GroupAlsoByWindowsAndCombineDoFn.java:206) at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsAndCombineDoFn.processElement(GroupAlsoByWindowsAndCombineDoFn.java:192) at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190) at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47) at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:226)

【问题讨论】:

  • 您是在本地运行管道,还是在 Google 云中运行?
  • 嗨@polleyg,我在谷歌云上运行。我也尝试了 n1-himem-32 workertype 。但它因内存不足异常而失败。

标签: java google-cloud-dataflow


【解决方案1】:

来自 Oracle docs

线程 thread_name 中的异常:java.lang.OutOfMemoryError:Java 堆空间

原因:详细信息Java heap space指示object could not 在 Java 堆中分配。此错误不一定意味着 内存泄漏。问题可以像配置问题一样简单, 其中指定的堆大小(或默认大小,如果不是 指定)对于应用程序来说是不够的。

解决方案1:如下增加JVM堆大小。

您可以为每个项目指定您的项目需要多少堆空间

以下是 Eclipse 的:

鼠标右键

Run As - Run Configuration - Arguments - Vm Arguments, 

然后添加这个

-Xmx1024 

-Xmx2048m

Solution2(仅在尝试过 Solution1 后):

再次来自 Oracle docs

3.4.3 监控等待完成的对象 当使用“Java 堆空间”引发 OutOfMemoryError 异常时 > 详细信息 消息,原因可能是过度使用终结器。诊断 这个,你有几个选项来监控对象的数量 正在等待最终确定

JConsole 管理工具可用于监控 待定的对象。此工具报告待处理 “摘要”选项卡窗格上的内存统计信息中的完成计数。 计数是近似的,但它可以用来表征一个 应用程序并了解它是否在很大程度上依赖于最终确定。

在 Oracle Solaris 和 Linux 操作系统上,jmap 实用程序可以 与 -finalizerinfo 选项一起使用以打印有关对象的信息 等待最终确定。

应用程序可以报告待处理对象的大致数量 使用 getObjectPendingFinalizationCount 方法完成 java.lang.management.MemoryMXBean 类。 API 的链接 文档和示例代码可以在Custom Diagnostic Tools 中找到。示例代码可以轻松扩展以包含 报告待定完成计数。

【讨论】:

  • 感谢@Sundararaj Govindasamy 的友好回答。但是,我没有使用 eclipse 来运行数据流作业,并且每一行的内存可能会超过您在建议中给我的内存。我认为我的问题的解决方案应该更多地与数据流本身而不是 java 相关。无论如何,非常感谢。
  • @Kyuntae,无论您是否使用 eclipse,潜在问题仅与 Java 有关。 userIdList 的大小是多少?以及您要加入多少个 UserIdLists?整个 Joiner 中的对象总数是多少?例如,如果整个 joiner 试图加入 10000 个对象,它试图在 Java Heap 中保存这 10000 个对象,但没有可用的内存来保存这些对象。您需要为 Java Heap 分配更多内存,或者您可以编写有效使用内存的代码,例如当对象没有引用时从内存中清除对象。希望这会有所帮助
  • 嗨@Sundararaj Govindasamy,据我所知,数据流不允许使用 -Xmx 标志设置堆大小(请参阅here 以避免对洗牌器产生负面影响。在我的情况下,每一行的size 太大,无法加载到内存中。(20 亿个 userId,每个 userId 的大小约为 40~50bytes)
  • 我从来没有在数据流中工作过,所以我不能帮你。为什么你试图一次加载?我认为这不是一个好主意。
  • 因为我要满足其他团队要求的数据格式。谢谢你帮助我@Sundararaj
猜你喜欢
  • 1970-01-01
  • 2016-09-02
  • 1970-01-01
  • 2011-04-13
  • 2014-11-12
  • 1970-01-01
  • 1970-01-01
  • 2014-07-20
相关资源
最近更新 更多