【问题标题】:Dynamic flink window creation by reading the details from kafka通过从 kafka 读取详细信息创建动态 flink 窗口
【发布时间】:2023-03-04 10:32:01
【问题描述】:

假设 Kafka 消息包含 flink 窗口大小配置。

我想从Kafka读取消息并在flink中创建一个全局窗口。

问题陈述:

我们可以使用 BroadcastStream 处理上述场景吗?

或者

还有其他方法可以支持上述情况吗?

【问题讨论】:

  • 您是要在作业运行时更改窗口大小,还是只是根据 Kafka 提供的数据初始化窗口?
  • 基于配置消息。如果它包含新信息或与新窗口大小相同的信息,则必须对其进行初始化,那么它应该在不重新启动的情况下更改窗口大小。

标签: apache-flink flink-streaming


【解决方案1】:

Flink 的窗口 API 不支持动态改变窗口大小。

您需要做的是使用进程函数实现您自己的窗口化。在本例中为 KeyedBroadcastProcessFunction,其中广播窗口配置。

您可以查看the Flink training 以获取有关如何使用 KeyedProcessFunction 实现时间窗口的示例(复制如下):

public class PseudoWindow extends KeyedProcessFunction<String, KeyedDataPoint<Double>, KeyedDataPoint<Integer>> {
    // Keyed, managed state, with an entry for each window.
    // There is a separate MapState object for each sensor.
    private MapState<Long, Integer> countInWindow;

    boolean eventTimeProcessing;
    int durationMsec;

    /**
     * Create the KeyedProcessFunction.
     * @param eventTime whether or not to use event time processing
     * @param durationMsec window length
     */
    public PseudoWindow(boolean eventTime, int durationMsec) {
        this.eventTimeProcessing = eventTime;
        this.durationMsec = durationMsec;
    }

    @Override
    public void open(Configuration config) {
        MapStateDescriptor<Long, Integer> countDesc =
                new MapStateDescriptor<>("countInWindow", Long.class, Integer.class);
        countInWindow = getRuntimeContext().getMapState(countDesc);
    }

    @Override
    public void processElement(
            KeyedDataPoint<Double> dataPoint,
            Context ctx,
            Collector<KeyedDataPoint<Integer>> out) throws Exception {

        long endOfWindow = setTimer(dataPoint, ctx.timerService());

        Integer count = countInWindow.get(endOfWindow);
        if (count == null) {
            count = 0;
        }
        count += 1;
        countInWindow.put(endOfWindow, count);
    }

    public long setTimer(KeyedDataPoint<Double> dataPoint, TimerService timerService) {
        long time;

        if (eventTimeProcessing) {
            time = dataPoint.getTimeStampMs();
        } else {
            time = System.currentTimeMillis();
        }
        long endOfWindow = (time - (time % durationMsec) + durationMsec - 1);

        if (eventTimeProcessing) {
            timerService.registerEventTimeTimer(endOfWindow);
        } else {
            timerService.registerProcessingTimeTimer(endOfWindow);
        }
        return endOfWindow;
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext context, Collector<KeyedDataPoint<Integer>> out) throws Exception {
        // Get the timestamp for this timer and use it to look up the count for that window
        long ts = context.timestamp();
        KeyedDataPoint<Integer> result = new KeyedDataPoint<>(context.getCurrentKey(), ts, countInWindow.get(ts));
        out.collect(result);
        countInWindow.remove(timestamp);
    }
} 

【讨论】:

    猜你喜欢
    • 2021-05-10
    • 2020-01-28
    • 1970-01-01
    • 2014-11-08
    • 1970-01-01
    • 1970-01-01
    • 2018-10-25
    • 2023-04-06
    • 1970-01-01
    相关资源
    最近更新 更多