【发布时间】:2020-04-29 20:41:30
【问题描述】:
我正在使用 KSQL 在 kafka 中工作。我想在 5 分钟内找出不同 DEV_NAME(ROWKEY) 中的最后一行。因此,我创建了流和聚合表以供进一步加入。
通过下面的 KSQL,我创建了一个表格,用于在 5 分钟内为不同的 DEV_NAME 找出最后一行
CREATE TABLE TESTING_TABLE AS
SELECT ROWKEY AS DEV_NAME, max(ROWTIME) as LAST_TIME
FROM TESTING_STREAM WINDOW TUMBLING (SIZE 5 MINUTES)
GROUP BY ROWKEY;
那么,我想一起加入:
CREATE STREAM TESTING_S_2 AS
SELECT *
FROM TESTING_S S
INNER JOIN TESTING_T T
ON S.ROWKEY = T.ROWKEY
WHERE
S.ROWTIME = T.LAST_TIME;
但是出现了错误:
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.streams.kstream.TimeWindowedSerializer) is not compatible to the actual key type (key type: org.apache.kafka.connect.data.Struct). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
应该是WINDOW TUMBLING功能改变了我的ROWKEY风格
(e.g. DEV_NAME_11508 -> DEV_NAME_11508 : Window{start=157888092000 end=-}
因此,如果不设置 Serdes,我可以从表转换为流并设置 PARTITION BY DEV_NAME 吗?
【问题讨论】:
-
KSQL 只支持字符串键,我上次检查过
标签: apache-kafka streaming ksqldb