【问题标题】:Flink maintaining configuration stateFlink 维护配置状态
【发布时间】:2019-10-09 18:57:09
【问题描述】:

我有一个在 Flink 中维护配置的用例,我真的不知道如何处理。

假设我在某处存储了一些配置,我需要它来进行处理。在Flink作业初始化的时候,我想加载所有的配置。

这个配置也可以在 Flink 作业运行的过程中修改,所以我必须在内存中保存这个配置的状态,并在需要的时候更新它。可以从 KafkaSource 访问配置更新。

这就是我所拥有的:

我有一个函数可以加载整个配置,使其保持状态并将其与我的数据流相关联:

public class MyConfiguration extends RichFlatMapFunction<Row, Row>{
    private transient MapState<String, MyConfObject> configuration;

    @Override
    public void open(MyConfiguration config) throws Exception{
        MapStateDescriptor<String,MyConfObject> descriptor = new MapStateDescriptor<String,MyConfObject>(
                "configuration",
                BasicTypeInfo.STRING_TYPE_INFO,
                ...
        );
        configuration = getRuntimeContext().getMapState(descriptor);
        configuration.putAll(...);   // Load configuration from somewhere
    }

    @Override
    public void flatMap(Row value, Collector<Row> out) throws Exception {
        MyConfObject conf = configuration.get(...);
        ...               // Associate conf with data
        out.collect(value);
    }
}

我的管道看起来像这样:

DataStream<Row> dataStream = ...; // My data stream
DataStream<Map<String, MyConfObject> streamConf = 
     env.addSource(new FlinkKafkaConsumer<Row>(..., ..., ...)) // The stream of configuration updates
        .map(...); 

return dataStream
    .assignTimestampsAndWatermarks(...)
    .flatMap(new MyConfiguration())

    ... //Do some processing

    .map(m -> {
        ObjectMapper objectMapper = new ObjectMapper();
        String json = objectMapper.writeValueAsString(m);
        return json.getBytes();
    });

我想要的是使用配置更新流streamConf 来更新MyConfiguration 平面地图函数中的状态变量。我该怎么做?

【问题讨论】:

    标签: java apache-flink flink-streaming


    【解决方案1】:

    我建议您编写一个从 Kafka 读取配置信息的源,然后通过广播流将配置更改广播到映射函数。映射函数会将完整的当前配置存储在其持久状态中,并且广播流意味着映射函数的所有实例都将获得所有配置更改。

    【讨论】:

    • 我不知道广播流。似乎是我正在寻找的东西。谢谢!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-30
    • 1970-01-01
    • 1970-01-01
    • 2015-03-30
    • 1970-01-01
    相关资源
    最近更新 更多