【问题标题】:Can I convert from Table to Stream in KSQL?我可以在 KSQL 中从表转换为流吗?
【发布时间】:2020-04-29 20:41:30
【问题描述】:

我正在使用 KSQL 在 kafka 中工作。我想在 5 分钟内找出不同 DEV_NAME(ROWKEY) 中的最后一行。因此,我创建了流和聚合表以供进一步加入。

通过下面的 KSQL,我创建了一个表格,用于在 5 分钟内为不同的 DEV_NAME 找出最后一行

CREATE TABLE TESTING_TABLE  AS
SELECT  ROWKEY AS DEV_NAME, max(ROWTIME) as LAST_TIME 
    FROM TESTING_STREAM WINDOW TUMBLING (SIZE 5 MINUTES)
    GROUP BY ROWKEY;

那么,我想一起加入:

CREATE STREAM TESTING_S_2 AS 
  SELECT *
    FROM TESTING_S  S
        INNER JOIN TESTING_T T
        ON    S.ROWKEY = T.ROWKEY
    WHERE  
    S.ROWTIME = T.LAST_TIME;

但是出现了错误:

Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.streams.kstream.TimeWindowedSerializer) is not compatible to the actual key type (key type: org.apache.kafka.connect.data.Struct). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

应该是WINDOW TUMBLING功能改变了我的ROWKEY风格

(e.g. DEV_NAME_11508 -> DEV_NAME_11508 : Window{start=157888092000 end=-}       

因此,如果不设置 Serdes,我可以从表转换为流并设置 PARTITION BY DEV_NAME 吗?

【问题讨论】:

  • KSQL 只支持字符串键,我上次检查过

标签: apache-kafka streaming ksqldb


【解决方案1】:

正如您所确定的,问题在于您的表格是窗口表格,这意味着表格的键是窗口的,您无法使用非窗口键查看窗口表格。

就目前而言,您的表将为每 5 分钟窗口的每个 ROWKEY 生成一个唯一行。然而,除了最近的窗口之外,您似乎什么都不关心。可能是您不需要表格中的窗口,例如

CREATE TABLE TESTING_TABLE AS 
   SELECT 
     ROWKEY AS DEV_NAME, 
     max(ROWTIME) as LAST_TIME  
   FROM TESTING_STREAM 
   WHERE ROWTIME > (UNIX_TIMESTAMP() - 300000) 
   GROUP BY ROWKEY;

将跟踪每个键的最大时间戳,忽略超过 5 分钟的任何时间戳。 (当然,这个检查只在收到事件的时候做,5分钟后行不会被删除)。

另外,这个连接:

CREATE STREAM TESTING_S_2 AS 
  SELECT *
    FROM TESTING_S  S
        INNER JOIN TESTING_T T
        ON    S.ROWKEY = T.ROWKEY
    WHERE  
    S.ROWTIME = T.LAST_TIME;

由于竞争条件,几乎可以肯定没有按照您的想法做,也不会按照您想要的方式工作。

尚不清楚您要达到的目标。添加有关您的源数据和所需输出的更多信息可能有助于人们为您提供解决方案。

【讨论】:

  • 这个问题有什么解决办法吗?
猜你喜欢
  • 2018-10-15
  • 1970-01-01
  • 2019-08-01
  • 2015-09-10
  • 1970-01-01
  • 1970-01-01
  • 2013-01-14
  • 1970-01-01
  • 2019-09-01
相关资源
最近更新 更多