【发布时间】:2018-10-26 13:05:50
【问题描述】:
我在使用 Confluent 4.1 的流和表中重新输入了数据
1) 创建流
CREATE STREAM session_details_stream (Media varchar ,SessionIdTime varchar,SessionIdSeq long) with (kafka_topic = 'sessionDetails', value_format = 'json');
2) 创建重新加密的流,因为此脚本不起作用,但在此之前它起作用,为什么?
CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM partition by root;
然后我创建下一个脚本 s
CREATE STREAM session_details_stream_update as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM partition by SessionIdTime;
CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,root from session_details_stream_update partition by root;
session_details_stream_rekeyed 的结果正常:
ksql> select * from session_details_stream_rekeyed;
1526411486488 | 2018-02-05T15:16:07.113+02:001| tex | 2018-02-05T15:16:07.113+02:001 | 1 | 2018-02-05T15:16:07.113+02:001
3) 为主题创建流;
CREATE STREAM voip_details_stream (SessionIdTime varchar,SessionIdSeq long) with (kafka_topic = 'voipDetails', value_format = 'json');
CREATE STREAM voip_details_stream_update as select SessionIdTime ,SessionIdSeq, CONCAT(SESSIONIDTIME,SESSIONIDSEQ) as root from voip_details_stream partition by SessionIdTime;
CREATE STREAM voip_details_stream_rekeyed6 as select SessionIdTime ,SessionIdSeq,root from voip_details_stream_update partition by root;
ksql> select * from voip_details_stream_rekeyed6;
1526411479438 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:00 | 1 | 2018-02-05T15:16:07.113+02:001
4) 创建表
CREATE TABLE voipDetails_table_test(SessionIdTime varchar,SessionIdSeq long,root varchar) WITH (kafka_topic='VOIP_DETAILS_STREAM_REKEYED6', value_format='JSON', KEY='root');
ksql> select * from voip_details_table;
1526411479438 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:00 | 1 | 2018-02-05T15:16:07.113+02:001
5) 然后我创建一个左连接
select c.root,u.root from session_details_stream_rekeyed c LEFT JOIN voipDetails_table_test u On c.root = u.root;
1526411477780 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:001 | null
问题出在哪里?
【问题讨论】:
-
您能否更新您的问题以包含您用于
VOIP_DETAILS_TABLE和SESSION_DETAILS_STREAM_REKEYED的STREAM和TABLE定义。 -
您还可以检查您引用的
print语句的输出吗?通常时间戳、密钥和有效负载以逗号分隔。 -
根据定义,我指的是您发布的
CREATE STREAM和CREATE TABLE语句。 -
还有你给出的结果,你能不能提供你运行的
SELECT语句,因为有五个列。我假设它是ROWKEY和ROWTIME。 -
基本上,为了帮助我们帮助您,最好能够轻松重现您所看到的行为,而不是试图推断它:)
标签: apache-kafka ksqldb