【发布时间】:2017-06-05 21:33:13
【问题描述】:
这是我编写的 Apache Beam PTransform:
public class NormalizeTransform
extends PTransform<PCollection<String>, PCollection<SimpleTable>> {
@Override
public PCollection<SimpleTable> expand(PCollection<String> lines) {
ExtractFields extract_i = new ExtractFields();
PCollection<SimpleTable> table = lines
.apply("Extracting data model fields from lines",
ParDo.of(extract_i));
}
public class ExtractFields extends DoFn<String, SimpleTable> {
@ProcessElement
public void processElement(ProcessContext c){
try {
String line = c.element();
// fill table
for (Table_Struct st: this.struct){
String o = line.substring(st.pos_1, st.pos_2));
this.table.getClass().getField(st.Field_Name).set(
this.table, o);
}
c.output(this.table);
}
}
偶尔我会收到以下错误IllegalMutationException,这意味着我重复了代码的运行,有时有效,有时无效。
org.apache.beam.sdk.util.IllegalMutationException: PTransform Transform/Extracting data model fields from lines/ParMultiDo(ExtractFields) mutated value after it was output (new value was ). Values must not be mutated in any way after being output.
at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:135)
at org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:214)
at org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:163)
at org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$TimerIterableCompletionCallback.handleResult(ExecutorServiceParallelExecutor.java:268)
at org.apache.beam.runners.direct.TransformExecutor.finishBundle(TransformExecutor.java:168)
at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:109)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
我不认为我在代码中的任何地方专门更改了值的任何输出。 MutationDetectors 将比较两个值:previousValue 和 newValue。在我的例子中,previousValue 通常是一个输入值,newValue 是另一个输入值。 Transform 怎么会尝试使用一个输入值来修改另一个输入值?
【问题讨论】:
标签: java google-cloud-dataflow apache-beam