【问题标题】:Flink schema evolution not working for POJO classFlink 模式演变不适用于 POJO 类
【发布时间】:2020-11-05 20:11:11
【问题描述】:

我有一个类满足被视为 POJO 的要求, 这是我的流媒体工作中的主要传输类 (它只包含原语和Map<String, String>)。 我添加了一个新的String 字段和相应的getter 和setter, 但是如果我停止使用带有保存点的前一个类的作业并尝试使用它重新启动新类, 我得到一个例外:

 java.lang.Exception: Exception while creating StreamOperatorStateContext.
        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_3b2dbb810ac7d55949cb205a3075facc_(8/8) from any of the 1 provided restore options.
        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
        ... 6 common frames omitted
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
        at org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
        ... 8 common frames omitted
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 172199998, Size: 13
...
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
        at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:133)
        at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
        at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
        at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
        at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293)
        at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
        at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:154)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
        ... 12 common frames omitted
Caused by: java.lang.IndexOutOfBoundsException: Index: 172199998, Size: 13
        at java.util.ArrayList.rangeCheck(ArrayList.java:657)
        at java.util.ArrayList.get(ArrayList.java:433)
        at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
        at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
        ... 23 common frames omitted

由于某种原因,它又回到了 Kryo。

我用的是Flink 1.9.3,according to the documentation这个改动应该支持。


根据大卫的回答,我正在尝试查看是否可以在将新字段添加到类之前即时迁移我的状态。 我已将@TypeInfo 注释及其工厂添加到MyPojo, 我正在尝试像这样迁移状态:

lsd = new ListStateDescriptor<>(
        "newName",
        MyPojo.class
);

// migration

TypeToken<LabeledClassWithTimestamp<String>> typeToken = new TypeToken<LabeledClassWithTimestamp<String>>() {};
ListStateDescriptor<LabeledClassWithTimestamp<String>> legacyLSD = new ListStateDescriptor<>(
        "oldName",
        new KryoSerializer<>((Class<LabeledClassWithTimestamp<String>>) typeToken.getRawType(), runtimeContext.getExecutionConfig())
);

ListState<LabeledClassWithTimestamp<String>> legacyState = runtimeContext.getListState(legacyLSD);
try {
    List<MyPojo> newState = new ArrayList<>();
    legacyState.get().forEach(o -> newState.add((MyPojo) o));

    if (!newState.isEmpty()) {
        runtimeContext.getListState(lsd).update(newState);
        legacyState.clear();
    }
} catch (Exception e) {
    LOG.error("Could not migrate state:", e);
}

但是,如果我使用新 jar 从以前的保存点恢复作业, Flink 在不同的运算符中抛出StateMigrationException

2020-11-08T12:57:59.369Z INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph:1511 [flink-akka.actor.default-dispatcher-17] window-operator (1/8) (uid) switched from RUNNING to FAILED. org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible.

那个操作符的状态只保存整数:

public class SlidingWindowProcessFunction extends ProcessWindowFunction<MyPojo, MyOutput, String, TimeWindow> {
    private static final long serialVersionUID = 1L;
    
    private static final ListStateDescriptor<Integer> LSD = new ListStateDescriptor<>(
            "window-state", Integer.class);
    
    ...
    
    @Override
    public void process(String key, Context context, Iterable<MyPojo> iterable, Collector<MyOutput> collector) {
        ...
        for (Integer hash : context.windowState().getListState(LSD).get()) {
            alreadyProcessedHashes.add(hash);
        }
        ...
    }
}

MyPojo 的序列化程序是否与此其他运算符的状态相关,即使 POJO 类未直接在其托管状态下使用?

【问题讨论】:

  • 只是为了确认一下,使用以前版本的 POJO 的工作流日志不包含任何类似于 Class &lt;your class&gt; cannot be used as a POJO type... 的 INFO 消息
  • @kkrugler 没错,我确实证实了这一点。

标签: java serialization apache-flink


【解决方案1】:

Flink 将接受一个类作为有效的 POJO 类型,即使它包含一个如果不回退到 Kryo 就无法序列化的字段(例如 LIST 或 MAP)。在这种情况下,不会出现关于 Class &lt;your class&gt; cannot be used as a POJO type ... 的 INFO 日志消息,但不会完全支持该类进行状态迁移。

Flink 可以处理 POJO 字段中的 LIST 和 MAP 类型,但不会自动处理(为了避免破坏向后兼容性)。

您可以通过使用 @TypeInfo 注释您的类并为其实现一个 TypeInfoFactory&lt;T&gt; 来为每个字段指定正确的 org.apache.flink.api.common.typeinfo.Types,包括 org.apache.flink.api.common.typeinfo.Types#MAP

可能看起来像这样:

@TypeInfo(MyPojo.MyPojoTypeInfoFactory.class)
public class MyPojo {
  private String data;
  private HashMap<String, String> attributes;

  public static class MyPojoTypeInfoFactory extends TypeInfoFactory<MyPojo> {
    @Override
    public TypeInformation<MyPojo> createTypeInfo(
        Type t, Map<String, TypeInformation<?>> genericParameters) {
      Map<String, TypeInformation<?>> fields =
          new HashMap<String, TypeInformation<?>>() {
            {
              put("data", Types.STRING);
              put("attributes", Types.MAP(Types.STRING, Types.STRING));
            }
          };
      return Types.POJO(MyPojo.class, fields);
    }
  }
}

请注意,Types.MAP 字段不能为空。地图中不允许使用空键,但可以使用空值。

【讨论】:

  • 感谢您提供的信息,但不幸的是它对我不起作用。我的“旧”工作已经发布,所以我无法更改它,并且仅在新 jar 中添加类型信息(将新字段添加到类中)似乎还不够,因为我得到了一个异常 @987654328 @。我不知道字段顺序是否相关,但我在提供给工厂的字段中尝试了许多不同的顺序(并将fields 映射更改为LinkedHashMap),但我总是遇到上述异常。
  • 是的,我是在解释您如何避免遇到的问题,而不是如何修复当前的情况。为此,您可以使用状态处理器 api 将状态从旧序列化程序迁移到新序列化程序。
  • 我想我可以使用 TypeSerializerSnapshot 在其 restoreSerializer 方法中返回 org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer 并迁移到支持进化的东西(如 kryo 的 VersionFieldSerializer),但似乎 @987654335 @只能为每个StateDescriptor指定,没有办法全局配置吗?
  • 我相信您可以为给定的类注册自定义 Kryo 序列化器,然后该类的任何状态描述符都将使用已注册的序列化器。您可以通过registerTypeWithKryoSerializer(Class&lt;?&gt;, Class&lt;? extends Serializer&gt;) 完成此操作。看com.esotericsoftware.kryo.Serializer
  • 我已经更新了问题,添加了我尝试过的内容,不知道您是否还有其他想法。
【解决方案2】:

我无法理解 Flink 内部发生了什么, 但我找到了一种在 2 次升级中实现迁移的方法, 虽然我真的不明白它为什么会起作用。

在第一次升级中,我没有向 POJO 类添加任何新字段, 但我添加了大卫建议的类型信息。 就我而言,关键是, 因为我最初的工作不能再修改,而且 POJO 没有注释, TypeInfoFactoryMap 的类型信息必须指向 Kryo:

put("mapField", Types.GENERIC(Map.class));

然后我添加了一个新的状态描述符而不修改旧的(我的旧状态是根据接口定义的):

ListStateDescriptor<InterfaceWithGeneric<String>> legacyLSD = new ListStateDescriptor<>(
        "oldName",
        TypeInformation.of(new TypeHint<InterfaceWithGeneric<String>>() {})
);

ListStateDescriptor<MyPojo> newLSD = new ListStateDescriptor<>(
        "newName",
        MyPojo.class
);

这样我可以从旧描述符中读取并根据需要初始化新描述符。

在第二次升级中,我可以删除旧描述符并将新字段添加到 POJO, 同时更新TypeInfoFactory。 现有地图的序列化将不得不继续使用 Kryo, 因为我找不到修改它的方法。

在一次升级中添加注释、新字段和新描述符对我来说不起作用。 我也不能重用旧的描述符; 第一次升级可以正常工作, 但在第二次升级中添加新字段再次引发异常。 我不知道为什么有些异常引用了完全不相关的运算符, 但这似乎只是来自状态恢复后端的错误报告。

【讨论】:

  • 很高兴您找到了前进的道路。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-02-27
  • 2021-12-26
  • 1970-01-01
  • 2019-09-24
相关资源
最近更新 更多