【问题标题】:MapState always empty in a RichCoFlatMapFunctionMapState 在 RichCoFlatMapFunction 中始终为空
【发布时间】:2020-11-21 09:12:22
【问题描述】:

我正在阅读 2 个流。一个带有记录,一个带有元数据。

我第一次希望我的应用程序通过扫描完整的表来构建元数据并将其保存到 Flink 的 MapState。表上的更新将通过元数据流捕获,并且 MapState 将相应更新。

从第二次开始,我想使用 MapState 而不是读取整个表格。

下面是我对这个功能的实现,但是我的 MapState 总是空的,我在这里做错了吗?

public class CustomCoFlatMap extends RichCoFlatMapFunction<Record, Metadata, Output> {

    private transient DataSource datasource;
    private transient MapState<String, Metadata> metadataState;

    @Inject
    public void setDataSource(DataSource datasource) {
        this.datasource = datasource;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        final RichFunctionComponent component = DaggerRichFunctionComponent.builder()
                .richFunctionModule(RichFunctionModule.builder()
                        .runtimeContext(getRuntimeContext())
                        .build())
                .build();
        component.inject(this);

        // read MapState from snapshot
        metadataState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Cluster>("metadataState",
                TypeInformation.of(new TypeHint<String>(){}), TypeInformation.of(new TypeHint<Metadata>() {})));
    }

    @Override
    public void flatMap2(Metadata metadata, Collector<Output> collector) throws Exception {
        // this should happen only when application starts for first time
        // from next time, application will read from snapshot
        readMetadataForFirstTime();

        // update metadata in MapState
        this.metadataState.put(metadata.getId(), metadata);
    }

    @Override
    public void flatMap1(Record record, Collector<Output> collector) throws Exception {
        
        readMetadataForFirstTime();
        
        Metadata metadata = this.metadataState.get(record.getId());

        Output output = new Output(record.getId(), metadataState.getName(), metadataState.getVersion(), metadata.getType());
    
        collector.collect(output);
    }

    private void readMetadataForFirstTime() throws Exception {
        if(this.metadataState.iterator().hasNext()) {
            // metadataState from snapshot has data
            // not reading from table
            return;
        }

        // do this only once
        // read metadata from table and add it to MapState
        List<Metadata> metadataList = datasource.listAllMetadata();
        for(Metadata metadata: metadataList) {
            this.metadataState.put(metadata.getid(), metadata);
        }

    }
}

编辑:应用程序的其余部分

DataStream<Metadata> metadataKeyedStream =
                env.addSource(metadataStream)
                        .keyBy(Metadata::getId);

SingleOutputStreamOperator<Output> outputStream =
                env.addSource(recordStream)
                        .assignTimestampsAndWatermarks(new RecordTimeExtractor())
                        .keyBy(Record::getId)
                        .connect(metadataKeyedStream)
                        .flatMap(new CustomCoFlatMap());

【问题讨论】:

    标签: apache-flink flink-streaming flink-statefun


    【解决方案1】:

    MapState 是一种键分区状态——这意味着 Flink 为输入流中的每个不同键维护一个单独的 Map&lt;String, Metadata&gt;readMetadataForFirstTime 必须为您的RichCoFlatMapFunction 处理的流中的每个键读取并插入其数据,因为每个键都有一个单独的映射。

    您可能希望以不同的方式处理此问题,具体取决于您要执行的操作。 例如,如果您只想为源流中的每个键存储一个值,那么您应该使用ValueState 而不是MapState。您可以将ValueState 视为分片键/值存储,其中有状态运算符的每个并行实例(例如,RichCoFlatMapFunction)将具有键空间切片的值。 MapState 适用于需要为每个键而不是单个对象存储整个哈希图的情况。

    (如果我误判问题出在哪里,请分享更多上下文,说明应用程序的其余部分如何使用此RichCoFlatMapFunction。)

    【讨论】:

    • 使用ValueState 而不是MapState 是有意义的,因为每个键我只有一个值。但是如何写一个 Metadata 来为每个 Id 赋值状态?我应该将 ValueStateDescriptor 名称保存为 Id 并将值保存为 Metadata 吗?如果是,那么我应该为flatMap1flatMap2 中收到的每条记录执行getRuntimeContext().getState(new ValueStateDescriptor&lt;&gt;(metadata.getId(), TypeInformation.of(new TypeHint&lt;Metadata&gt;() {})));
    • 我建议你看看ci.apache.org/projects/flink/flink-docs-stable/learn-flink/…github.com/apache/flink-training/tree/release-1.11/…中的Flink培训资料,里面详细解释了事情是如何运作的,还包括一些例子。
    • 状态描述符名称应该是一个常量字符串;像“元数据”这样的东西就可以了。如果 Metadata 是 POJO,那么您可以简单地执行 new ValueStateDescriptor&lt;&gt;("metadata", Metadata.class) 之类的操作。
    • 知道了,我想知道如何将所有Metadata 放入ValueState 即,当我调用readMetadataForFirstTime() 时,我想将Metadata 的列表添加到ValueState .但我只看到ValueState 的更新,如何将所有Metadata 列表添加到`ValueState 中?
    • 换句话说,我想读取表Metadata 中的所有条目,并仅在应用程序第一次启动时将它们保存到状态。当应用程序重新启动时,我想从状态中检索它们而不是再次读取表。
    猜你喜欢
    • 2017-10-11
    • 2011-09-10
    • 2016-09-04
    • 1970-01-01
    • 1970-01-01
    • 2017-07-31
    • 2019-07-18
    • 2019-08-18
    • 2011-12-29
    相关资源
    最近更新 更多