【发布时间】:2020-04-25 10:19:47
【问题描述】:
我有一些主题数据,字段为 stringA stringB,我只是想在从主题创建 KSQL 表时将其用作键。
【问题讨论】:
标签: apache-kafka-connect ksqldb confluent-platform
我有一些主题数据,字段为 stringA stringB,我只是想在从主题创建 KSQL 表时将其用作键。
【问题讨论】:
标签: apache-kafka-connect ksqldb confluent-platform
这是一个例子。首先,我将创建并填充一个测试流
ksql> CREATE STREAM TEST (STRINGA VARCHAR,
STRINGB VARCHAR,
COL3 INT)
WITH (KAFKA_TOPIC='TEST',
PARTITIONS=1,
VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
ksql> INSERT INTO TEST (STRINGA, STRINGB, COL3) VALUES ('A','B',1);
ksql> INSERT INTO TEST (STRINGA, STRINGB, COL3) VALUES ('A','B',2);
ksql> INSERT INTO TEST (STRINGA, STRINGB, COL3) VALUES ('C','D',3);
ksql>
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT * FROM TEST EMIT CHANGES LIMIT 3;
+--------------+--------+---------+----------+------+
|ROWTIME |ROWKEY |STRINGA |STRINGB |COL3 |
+--------------+--------+---------+----------+------+
|1578569329184 |null |A |B |1 |
|1578569331653 |null |A |B |2 |
|1578569339177 |null |C |D |3 |
请注意,ROWKEY 为空。
现在我将创建一个从第一个填充的新流,并创建复合列并将其设置为键。我还包括原始字段本身,但如果您不需要它们,这是可选的:
ksql> CREATE STREAM TEST_REKEY AS
SELECT STRINGA + STRINGB AS MY_COMPOSITE_KEY,
STRINGA,
STRINGB,
COL3
FROM TEST
PARTITION BY MY_COMPOSITE_KEY ;
Message
------------------------------------------------------------------------------------------
Stream TEST_REKEY created and running. Created by query with query ID: CSAS_TEST_REKEY_9
------------------------------------------------------------------------------------------
现在您有了一个数据流,其中的键设置为您的复合键:
ksql> SELECT ROWKEY , COL3 FROM TEST_REKEY EMIT CHANGES LIMIT 3;
+---------+-------+
|ROWKEY |COL3 |
+---------+-------+
|AB |1 |
|AB |2 |
|CD |3 |
Limit Reached
Query terminated
您还可以检查底层 Kafka 主题以验证密钥:
ksql> PRINT TEST_REKEY LIMIT 3;
Format:JSON
{"ROWTIME":1578569329184,"ROWKEY":"AB","MY_COMPOSITE_KEY":"AB","STRINGA":"A","STRINGB":"B","COL3":1}
{"ROWTIME":1578569331653,"ROWKEY":"AB","MY_COMPOSITE_KEY":"AB","STRINGA":"A","STRINGB":"B","COL3":2}
{"ROWTIME":1578569339177,"ROWKEY":"CD","MY_COMPOSITE_KEY":"CD","STRINGA":"C","STRINGB":"D","COL3":3}
ksql>
完成后,我们现在可以在重新键入的主题之上声明一个表:
CREATE TABLE TEST_TABLE (ROWKEY VARCHAR KEY,
COL3 INT)
WITH (KAFKA_TOPIC='TEST_REKEY', VALUE_FORMAT='JSON');
从这个表中我们可以查询状态。请注意,复合键 AB 仅显示最新值,这是表语义的一部分(与上面的流相比,您可以在其中看到两个值 - 流和表都是同一个 Kafka 主题):
ksql> SELECT * FROM TEST_TABLE EMIT CHANGES;
+----------------+---------+------+
|ROWTIME |ROWKEY |COL3 |
+----------------+---------+------+
|1578569331653 |AB |2 |
|1578569339177 |CD |3 |
【讨论】:
只是对@Robin Moffat 的更新.. 使用下面的
CREATE STREAM TEST_REKEY AS
SELECT STRINGA + STRINGB AS MY_COMPOSITE_KEY,
STRINGA,
STRINGB,
COL3
FROM TEST
PARTITION BY STRINGA + STRINGB ;
而不是
CREATE STREAM TEST_REKEY AS
SELECT STRINGA + STRINGB AS MY_COMPOSITE_KEY,
STRINGA,
STRINGB,
COL3
FROM TEST
PARTITION BY MY_COMPOSITE_KEY ;
注意:列顺序很重要 为我工作! (CLI v0.10.1,服务器 v0.10.1)
【讨论】: