【发布时间】:2016-07-23 03:45:50
【问题描述】:
当我使用数据流时,我需要创建一些字符串行,其中用户 ID 列表用逗号分隔。然后在GCS中写入结果。
不幸的是,在DoFn的processElement期间,每一行都有太多的用户,导致java.lang.OutOfMemoryError。
有什么办法可以避免OutOfMemory异常并成功写入胖行GCS 中的每一行都带有文本文件?
我的源代码如下所示。
PCollection<KV<String, String>> rows = someData
.apply(Combine.<String, String>perKey(new CombineUserIds()));
public static class CombineUserIds implements SerializableFunction<Iterable<String>, String> {
private static final long serialVersionUID = 0;
@Override
public String apply(Iterable<String> userIdList) {
return Joiner.on(",").join(userIdList);
}
}
这里,源代码中的someData是PCollection<KV<String, String>>类型,key是group_id,value是user_id。
以下是完整的错误消息(b997767fac436e5c): java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3332) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) at java.lang.StringBuilder.append(StringBuilder.java:136) at java.lang.StringBuilder.append(StringBuilder.java:76) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:457) at java.lang.StringBuilder.append(StringBuilder.java:166) at java.lang.StringBuilder.append(StringBuilder.java:76) at com.google.common.base.Joiner.appendTo(Joiner.java:111) at com.google.common.base.Joiner.appendTo(Joiner.java:152) at com.google.common.base.Joiner.join(Joiner.java:193) at com.google.common.base.Joiner.join(Joiner.java:183) at com.moloco.dataflow.ml.adhoc.GenerateMLUserProfileSet$CombineUserIds.apply(GenerateMLUserProfileSet.java:189) at com.moloco.dataflow.ml.adhoc.GenerateMLUserProfileSet$CombineUserIds.apply(GenerateMLUserProfileSet.java:184) at com.google.cloud.dataflow.sdk.transforms.Combine$IterableCombineFn.mergeToSingleton(Combine.java:1613) at com.google.cloud.dataflow.sdk.transforms.Combine$IterableCombineFn.mergeAccumulators(Combine.java:1591) at com.google.cloud.dataflow.sdk.transforms.Combine$IterableCombineFn.mergeAccumulators(Combine.java:1536) at com.google.cloud.dataflow.sdk.transforms.Combine$CombineFn$2.mergeAccumulators(Combine.java:489) at com.google.cloud.dataflow.sdk.runners.worker.GroupAlsoByWindowsParDoFnFactory$MergingKeyedCombineFn.extractOutput(GroupAlsoByWindowsParDoFnFactory.java:249) at com.google.cloud.dataflow.sdk.runners.worker.GroupAlsoByWindowsParDoFnFactory$MergingKeyedCombineFn.extractOutput(GroupAlsoByWindowsParDoFnFactory.java:216) at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsAndCombineDoFn$KeyedCombineFnRunner.extractOutput(GroupAlsoByWindowsAndCombineDoFn.java:243) at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsAndCombineDoFn.closeWindow(GroupAlsoByWindowsAndCombineDoFn.java:206) at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsAndCombineDoFn.processElement(GroupAlsoByWindowsAndCombineDoFn.java:192) at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190) at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47) at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:226)
【问题讨论】:
-
您是在本地运行管道,还是在 Google 云中运行?
-
嗨@polleyg,我在谷歌云上运行。我也尝试了 n1-himem-32 workertype 。但它因内存不足异常而失败。
标签: java google-cloud-dataflow