【问题标题】:Streaming Flink SQL with GROUP BY over not timestamp column在非时间戳列上使用 GROUP BY 流式处理 Flink SQL
【发布时间】:2021-09-12 21:38:11
【问题描述】:

e2e Flink SQL tutorial 中,源表被定义为带有时间戳列的Kafka 来源表,在该列上启用了水印

CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime AS PROCTIME(),   -- generates processing-time attribute using computed column
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND  -- defines watermark on ts column, marks ts as event-time attribute
) WITH (
    'connector' = 'kafka',  -- using kafka connector
    'topic' = 'user_behavior',  -- kafka topic
    'scan.startup.mode' = 'earliest-offset',  -- reading from the beginning
    'properties.bootstrap.servers' = 'kafka:9094',  -- kafka broker address
    'format' = 'json'  -- the data format is json
);

只要 GROUP BY 是在 ts 字段上由 TUMBLE 生成的,这看起来很自然(因为 Flink 知道何时触发/弹出窗口)但在教程中间我们看到以下表达式

INSERT INTO cumulative_uv
SELECT date_str, MAX(time_str), COUNT(DISTINCT user_id) as uv
FROM (
  SELECT
    DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str,
    SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0' as time_str,
    user_id
  FROM user_behavior)
GROUP BY date_str;

在这里我们看到 GROUP BY 是在派生 date_str 字段上创建的,但是水印在这里是如何工作的? Flink 如何决定何时“关闭” date_str 存储桶?由于date_strts 之上的一些功能,它必须以某种方式理解ts 的水印更新将如何转换为date_str 字段的水位,这对我来说似乎是不可行的。它在内部是如何工作的,Flink 是否将所有遇到的记录存储在它的状态中?

【问题讨论】:

    标签: apache-flink flink-streaming flink-sql


    【解决方案1】:

    或许你可以参考下面的链接了解Watermarks的生成和传递,尤其是《Operators如何处理Watermarks》

    本例中,水印是从源算子的ts生成的,下游算子只会处理水印,与date_str字段无关。

    public class TimestampsAndWatermarksOperator<T> extends AbstractStreamOperator<T>
            implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {
    ......
    
    
        @Override
        public void open() throws Exception {
            super.open();
    
            timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
            watermarkGenerator =
                    emitProgressiveWatermarks
                            ? watermarkStrategy.createWatermarkGenerator(this::getMetricGroup)
                            : new NoWatermarksGenerator<>();
    
            wmOutput = new WatermarkEmitter(output);
    
            watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
            if (watermarkInterval > 0 && emitProgressiveWatermarks) {
                final long now = getProcessingTimeService().getCurrentProcessingTime();
                getProcessingTimeService().registerTimer(now + watermarkInterval, this);
            }
        }
    
        @Override
        public void processElement(final StreamRecord<T> element) throws Exception {
            final T event = element.getValue();
            final long previousTimestamp =
                    element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE;
            final long newTimestamp = timestampAssigner.extractTimestamp(event, previousTimestamp);
    
            element.setTimestamp(newTimestamp);
            output.collect(element);
            watermarkGenerator.onEvent(event, newTimestamp, wmOutput);
        }
    
    ......
    
        @Override
        public void processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark)
                throws Exception {
            // if we receive a Long.MAX_VALUE watermark we forward it since it is used
            // to signal the end of input and to not block watermark progress downstream
            if (mark.getTimestamp() == Long.MAX_VALUE) {
                wmOutput.emitWatermark(Watermark.MAX_WATERMARK);
            }
        }
    ......
    }
    

    https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2012-08-29
      • 1970-01-01
      • 2012-02-18
      • 2021-05-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多