【问题标题】:IllegalMutationException from Beam PTransform来自 Beam PTransform 的 IllegalMutationException
【发布时间】:2017-06-05 21:33:13
【问题描述】:

这是我编写的 Apache Beam PTransform:

public class NormalizeTransform
  extends PTransform<PCollection<String>, PCollection<SimpleTable>> {

@Override
public PCollection<SimpleTable> expand(PCollection<String> lines) {
  ExtractFields extract_i = new ExtractFields();
  PCollection<SimpleTable> table = lines
    .apply("Extracting data model fields from lines",
           ParDo.of(extract_i));
}                                                   

public class ExtractFields extends DoFn<String, SimpleTable> {

@ProcessElement
public void processElement(ProcessContext c){
  try {
    String line = c.element();              
    // fill table
    for (Table_Struct st: this.struct){
      String o = line.substring(st.pos_1, st.pos_2));
      this.table.getClass().getField(st.Field_Name).set(
        this.table, o);                                                                     
    }
    c.output(this.table);
  }
}

偶尔我会收到以下错误IllegalMutationException,这意味着我重复了代码的运行,有时有效,有时无效。

org.apache.beam.sdk.util.IllegalMutationException: PTransform Transform/Extracting data model fields from lines/ParMultiDo(ExtractFields) mutated value  after it was output (new value was ). Values must not be mutated in any way after being output.

at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:135)
at org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:214)
at org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:163)
at org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$TimerIterableCompletionCallback.handleResult(ExecutorServiceParallelExecutor.java:268)
at org.apache.beam.runners.direct.TransformExecutor.finishBundle(TransformExecutor.java:168)
at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:109)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

我不认为我在代码中的任何地方专门更改了值的任何输出。 MutationDetectors 将比较两个值:previousValue 和 newValue。在我的例子中,previousValue 通常是一个输入值,newValue 是另一个输入值。 Transform 怎么会尝试使用一个输入值来修改另一个输入值?

【问题讨论】:

    标签: java google-cloud-dataflow apache-beam


    【解决方案1】:

    我不确定this.table 来自哪里。

    但为了帮助您理解错误消息,请记住 processElement 可能会在多个输入上调用。第一次调用将输出this.table。下一个调用将在输出之前改变this.table

    如果这种突变发生在第一次调用输出 this.table 之后并且在下游代码有机会读取 this.table 之前,您将得到不正确的结果。所以,这个错误表明你在引用输出后改变了this.table的内容——这是你不应该做的。

    请考虑 (1) 输出 this.table 的副本或 (2) 将表创建为本地字段。例如:

    @ProcessElement
    public void processElement(ProcessContext c){
      try {
        String line = c.element(); 
        Table table = /* create the table */;             
        // fill table
        for (Table_Struct st: this.struct){
          String o = line.substring(st.pos_1, st.pos_2));
          this.table.getClass().getField(st.Field_Name)
            .set(table, o);                                                                     
        }
        c.output(table);
      }
    }
    

    另请注意,在每个 processElement 中执行反射可能比预期的要慢。如果您可以直接修改字段,那可能会更好。

    【讨论】:

    • 非常感谢您的回复。这就解释了为什么。实际上,我确实担心processElement 中的反射可能会拖累性能。但在这种情况下,表是运行时定义的,不能直接修改,因此我求助于使用反射。你有比反思更好的主意吗?
    • 鉴于它是在运行时定义的,听起来您除了反射之外没有很多选择。这取决于您可以使用哪些其他选项/库来避免反射。
    • 您对其他选项/库有什么建议吗?谢谢@Ben Chambers
    • 其他选项包括使用 AutoValue、Avro 或协议缓冲区从模式生成代码,并使用它们。很大程度上取决于您的实际用例是什么。您可能想更详细地描述您正在尝试做什么,并提出更一般的 Java 问题。
    【解决方案2】:

    还要注意,强制比较反序列化的值,因此它假定反序列化是确定性的。如果对象包含集合,则需要注意元素的顺序。

    【讨论】:

      猜你喜欢
      • 2020-06-14
      • 1970-01-01
      • 2023-01-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-05-22
      相关资源
      最近更新 更多