【发布时间】:2021-09-12 21:38:11
【问题描述】:
在e2e Flink SQL tutorial 中,源表被定义为带有时间戳列的Kafka 来源表,在该列上启用了水印
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime AS PROCTIME(), -- generates processing-time attribute using computed column
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- defines watermark on ts column, marks ts as event-time attribute
) WITH (
'connector' = 'kafka', -- using kafka connector
'topic' = 'user_behavior', -- kafka topic
'scan.startup.mode' = 'earliest-offset', -- reading from the beginning
'properties.bootstrap.servers' = 'kafka:9094', -- kafka broker address
'format' = 'json' -- the data format is json
);
只要 GROUP BY 是在 ts 字段上由 TUMBLE 生成的,这看起来很自然(因为 Flink 知道何时触发/弹出窗口)但在教程中间我们看到以下表达式
INSERT INTO cumulative_uv
SELECT date_str, MAX(time_str), COUNT(DISTINCT user_id) as uv
FROM (
SELECT
DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str,
SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0' as time_str,
user_id
FROM user_behavior)
GROUP BY date_str;
在这里我们看到 GROUP BY 是在派生 date_str 字段上创建的,但是水印在这里是如何工作的? Flink 如何决定何时“关闭” date_str 存储桶?由于date_str 是ts 之上的一些功能,它必须以某种方式理解ts 的水印更新将如何转换为date_str 字段的水位,这对我来说似乎是不可行的。它在内部是如何工作的,Flink 是否将所有遇到的记录存储在它的状态中?
【问题讨论】:
标签: apache-flink flink-streaming flink-sql