【发布时间】:2020-11-05 20:11:11
【问题描述】:
我有一个类满足被视为 POJO 的要求,
这是我的流媒体工作中的主要传输类
(它只包含原语和Map<String, String>)。
我添加了一个新的String 字段和相应的getter 和setter,
但是如果我停止使用带有保存点的前一个类的作业并尝试使用它重新启动新类,
我得到一个例外:
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_3b2dbb810ac7d55949cb205a3075facc_(8/8) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
... 6 common frames omitted
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 8 common frames omitted
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 172199998, Size: 13
...
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:133)
at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:154)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 12 common frames omitted
Caused by: java.lang.IndexOutOfBoundsException: Index: 172199998, Size: 13
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
... 23 common frames omitted
由于某种原因,它又回到了 Kryo。
我用的是Flink 1.9.3,according to the documentation这个改动应该支持。
根据大卫的回答,我正在尝试查看是否可以在将新字段添加到类之前即时迁移我的状态。
我已将@TypeInfo 注释及其工厂添加到MyPojo,
我正在尝试像这样迁移状态:
lsd = new ListStateDescriptor<>(
"newName",
MyPojo.class
);
// migration
TypeToken<LabeledClassWithTimestamp<String>> typeToken = new TypeToken<LabeledClassWithTimestamp<String>>() {};
ListStateDescriptor<LabeledClassWithTimestamp<String>> legacyLSD = new ListStateDescriptor<>(
"oldName",
new KryoSerializer<>((Class<LabeledClassWithTimestamp<String>>) typeToken.getRawType(), runtimeContext.getExecutionConfig())
);
ListState<LabeledClassWithTimestamp<String>> legacyState = runtimeContext.getListState(legacyLSD);
try {
List<MyPojo> newState = new ArrayList<>();
legacyState.get().forEach(o -> newState.add((MyPojo) o));
if (!newState.isEmpty()) {
runtimeContext.getListState(lsd).update(newState);
legacyState.clear();
}
} catch (Exception e) {
LOG.error("Could not migrate state:", e);
}
但是,如果我使用新 jar 从以前的保存点恢复作业,
Flink 在不同的运算符中抛出StateMigrationException :
2020-11-08T12:57:59.369Z INFO org.apache.flink.runtime.executiongraph.ExecutionGraph:1511 [flink-akka.actor.default-dispatcher-17] window-operator (1/8) (uid) switched from RUNNING to FAILED. org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible.
那个操作符的状态只保存整数:
public class SlidingWindowProcessFunction extends ProcessWindowFunction<MyPojo, MyOutput, String, TimeWindow> {
private static final long serialVersionUID = 1L;
private static final ListStateDescriptor<Integer> LSD = new ListStateDescriptor<>(
"window-state", Integer.class);
...
@Override
public void process(String key, Context context, Iterable<MyPojo> iterable, Collector<MyOutput> collector) {
...
for (Integer hash : context.windowState().getListState(LSD).get()) {
alreadyProcessedHashes.add(hash);
}
...
}
}
MyPojo 的序列化程序是否与此其他运算符的状态相关,即使 POJO 类未直接在其托管状态下使用?
【问题讨论】:
-
只是为了确认一下,使用以前版本的 POJO 的工作流日志不包含任何类似于
Class <your class> cannot be used as a POJO type...的 INFO 消息 -
@kkrugler 没错,我确实证实了这一点。
标签: java serialization apache-flink