【问题标题】:Kafka ksql simple join does not workKafka ksql简单连接不起作用
【发布时间】:2018-10-26 13:05:50
【问题描述】:

我在使用 Confluent 4.1 的流和表中重新输入了数据

1) 创建流

   CREATE STREAM session_details_stream (Media varchar ,SessionIdTime varchar,SessionIdSeq long) with (kafka_topic = 'sessionDetails', value_format = 'json');

2) 创建重新加密的流,因为此脚本不起作用,但在此之前它起作用,为什么?

CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM  partition by root;

然后我创建下一个脚本 s

CREATE STREAM session_details_stream_update as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM  partition by SessionIdTime;
CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,root from session_details_stream_update  partition by root;

session_details_stream_rekeyed 的结果正常:

ksql> select * from session_details_stream_rekeyed;
      1526411486488 | 2018-02-05T15:16:07.113+02:001| tex | 2018-02-05T15:16:07.113+02:001 | 1 | 2018-02-05T15:16:07.113+02:001

3) 为主题创建流;

 CREATE STREAM voip_details_stream (SessionIdTime varchar,SessionIdSeq long) with (kafka_topic = 'voipDetails', value_format = 'json');
 CREATE STREAM voip_details_stream_update as select SessionIdTime ,SessionIdSeq, CONCAT(SESSIONIDTIME,SESSIONIDSEQ) as root from voip_details_stream  partition by SessionIdTime;
 CREATE STREAM voip_details_stream_rekeyed6 as select SessionIdTime ,SessionIdSeq,root from voip_details_stream_update  partition by root;



 ksql> select * from voip_details_stream_rekeyed6;
      1526411479438 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:00 | 1 | 2018-02-05T15:16:07.113+02:001

4) 创建表

 CREATE TABLE voipDetails_table_test(SessionIdTime varchar,SessionIdSeq long,root varchar) WITH (kafka_topic='VOIP_DETAILS_STREAM_REKEYED6', value_format='JSON', KEY='root');

 ksql> select * from voip_details_table;

       1526411479438 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:00 | 1 | 2018-02-05T15:16:07.113+02:001

5) 然后我创建一个左连接

select  c.root,u.root from session_details_stream_rekeyed c LEFT JOIN voipDetails_table_test u On c.root  = u.root;

   1526411477780 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:001 | null

问题出在哪里?

【问题讨论】:

  • 您能否更新您的问题以包含您用于VOIP_DETAILS_TABLESESSION_DETAILS_STREAM_REKEYEDSTREAMTABLE 定义。
  • 您还可以检查您引用的print 语句的输出吗?通常时间戳、密钥和有效负载以逗号分隔。
  • 根据定义,我指的是您发布的CREATE STREAMCREATE TABLE 语句。
  • 还有你给出的结果,你能不能提供你运行的SELECT语句,因为有五个列。我假设它是ROWKEYROWTIME
  • 基本上,为了帮助我们帮助您,最好能够轻松重现您所看到的行为,而不是试图推断它:)

标签: apache-kafka ksqldb


【解决方案1】:

tl;dr 在进行流表连接时,您的 消息必须已经存在(并且必须加时间戳)流之前消息。如果您重新发送源流消息,则在填充表主题后,连接将成功。

示例数据

使用kafkacat 填充主题(将数据粘贴到stdin

cat > /tmp/msgs <<EOF
{"Media":"Foo","SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1}
{"Media":"Foo","SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2}
EOF
kafkacat -b localhost:9092 -P -t sessionDetails /tmp/msgs


cat > /tmp/msgs <<EOF
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1a"}
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1b"}
{"SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2,"Details":"Bar2"}
EOF
kafkacat -b localhost:9092 -P -t voipDetails /tmp/msgs

验证主题内容:

Robin@asgard02 ~> kafkacat -b localhost:9092 -C -t sessionDetails
{"Media":"Foo","SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1}
{"Media":"Foo","SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2}

Robin@asgard02 ~> kafkacat -b localhost:9092 -C -t voipDetails
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1a"}
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1b"}
{"SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2,"Details":"Bar2"}

声明源流

ksql> CREATE STREAM session_details_stream \
      (Media varchar ,SessionIdTime varchar,SessionIdSeq long) \
      WITH (KAFKA_TOPIC = 'sessionDetails', VALUE_FORMAT = 'json');

 Message
----------------
 Stream created
----------------
ksql> CREATE STREAM voip_details_stream \
      (SessionIdTime varchar,SessionIdSeq long, Details varchar) \
      WITH (KAFKA_TOPIC = 'voipDetails', VALUE_FORMAT = 'json');

 Message
----------------
 Stream created
----------------
ksql> select * from session_details_stream;
1526553130864 | null | Foo | 2018-05-17 11:25:33 BST | 1
1526553130865 | null | Foo | 2018-05-17 11:26:33 BST | 2
^CQuery terminated
ksql> select * from voip_details_stream;
1526553143176 | null | 2018-05-17 11:25:33 BST | 1 | Bar1a
1526553143176 | null | 2018-05-17 11:25:33 BST | 1 | Bar1b
1526553143176 | null | 2018-05-17 11:26:33 BST | 2 | Bar2
^CQuery terminated

在 SessionIdTime+SessionIdSeq 上重新划分每个主题

ksql> CREATE STREAM SESSION AS \
      SELECT Media, CONCAT(SessionIdTime,SessionIdSeq) AS root \
      FROM session_details_stream \
      PARTITION BY root;

 Message
----------------------------
 Stream created and running
----------------------------


ksql> SELECT ROWTIME, ROWKEY, root, media FROM SESSION;
1526553130864 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Foo
1526553130865 | 2018-05-17 11:26:33 BST2 | 2018-05-17 11:26:33 BST2 | Foo


ksql> CREATE STREAM VOIP AS \
      SELECT CONCAT(SessionIdTime,SessionIdSeq) AS root, details \
      FROM voip_details_stream \
      PARTITION BY root;

 Message
----------------------------
 Stream created and running
----------------------------
ksql>

声明表

ksql> CREATE TABLE VOIP_TABLE (root VARCHAR, details VARCHAR) \
      WITH (KAFKA_TOPIC='VOIP', VALUE_FORMAT='JSON', KEY='root');

 Message
---------------
 Table created
---------------
ksql> SELECT ROWTIME, ROWKEY, root, details FROM VOIP;
1526553143176 | 2018-05-17 11:26:33 BST2 | 2018-05-17 11:26:33 BST2 | Bar2
1526553143176 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Bar1a
1526553143176 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Bar1b

将 SESSION 流加入 VOIP 表

ksql> SELECT s.ROWTIME, s.root, s.media, v.details \
      FROM SESSION s \
      LEFT OUTER JOIN VOIP_TABLE v ON S.root = V.root;
1526553130864 | 2018-05-17 11:25:33 BST1 | Foo | null
1526553130865 | 2018-05-17 11:26:33 BST2 | Foo | null

让上面的 JOIN 查询继续运行。向源主题重新发送 SESSION 消息(使用kafkacatsessionDetails 发送与上述相同的消息):

1526553862403 | 2018-05-17 11:25:33 BST1 | Foo | Bar1a
1526553988639 | 2018-05-17 11:26:33 BST2 | Foo | Bar2

Confluent Community Slack 上的每个 Rohan Desai:

问题在于流中记录的行时间早于您希望它加入的表中记录的行时间。所以当流记录被处理时,表中没有对应的记录

使用ROWTIME 查看源表中某个连接键的消息以查看消息时间戳(不要与基于时间戳的root 混淆):

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ROWTIME, root, details from VOIP WHERE root='2018-05-17 11:26:33 BST2';
2018-05-17 11:32:23 | 1526553143176 | 2018-05-17 11:26:33 BST2 | Bar2

将此与源会话流主题上的消息进行比较:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ROWTIME, root, media from SESSION WHERE root='2018-05-17 11:26:33 BST2';
2018-05-17 11:32:10 | 1526553130865 | 2018-05-17 11:26:33 BST2 | Foo
2018-05-17 11:46:28 | 1526553988639 | 2018-05-17 11:26:33 BST2 | Foo

第一个这些(11:32:10/1526553130865)在相应的VOIP消息之前(如上所示),并导致null加入结果我们第一次看到。其中的是较晚的(11:46:28/1526553988639)产生我们随后看到的成功连接:

1526553988639 | 2018-05-17 11:26:33 BST2 | Foo | Bar2

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2013-09-29
    • 2010-10-10
    • 2012-01-11
    • 2020-06-02
    • 1970-01-01
    • 2019-11-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多