【问题标题】:How to create KSQL table from a topic with composite key?如何从具有复合键的主题创建 KSQL 表?
【发布时间】:2020-04-25 10:19:47
【问题描述】:

我有一些主题数据,字段为 stringA stringB,我只是想在从主题创建 KSQL 表时将其用作键。

【问题讨论】:

    标签: apache-kafka-connect ksqldb confluent-platform


    【解决方案1】:

    这是一个例子。首先,我将创建并填充一个测试流

    ksql> CREATE STREAM TEST (STRINGA VARCHAR, 
                              STRINGB VARCHAR, 
                              COL3 INT) 
                        WITH (KAFKA_TOPIC='TEST',
                              PARTITIONS=1,
                              VALUE_FORMAT='JSON');
    
     Message
    ----------------
     Stream created
    ----------------
    
    ksql> INSERT INTO TEST (STRINGA, STRINGB, COL3) VALUES ('A','B',1);
    ksql> INSERT INTO TEST (STRINGA, STRINGB, COL3) VALUES ('A','B',2);
    ksql> INSERT INTO TEST (STRINGA, STRINGB, COL3) VALUES ('C','D',3);
    ksql>
    ksql> SET 'auto.offset.reset' = 'earliest';
    Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
    
    ksql> SELECT * FROM TEST EMIT CHANGES LIMIT 3;
    +--------------+--------+---------+----------+------+
    |ROWTIME       |ROWKEY  |STRINGA  |STRINGB   |COL3  |
    +--------------+--------+---------+----------+------+
    |1578569329184 |null    |A        |B         |1     |
    |1578569331653 |null    |A        |B         |2     |
    |1578569339177 |null    |C        |D         |3     |
    

    请注意,ROWKEY 为空。

    现在我将创建一个从第一个填充的新流,并创建复合列并将其设置为键。我还包括原始字段本身,但如果您不需要它们,这是可选的:

    ksql> CREATE STREAM TEST_REKEY AS 
            SELECT STRINGA + STRINGB AS MY_COMPOSITE_KEY, 
                   STRINGA, 
                   STRINGB, 
                   COL3 
              FROM TEST 
              PARTITION BY MY_COMPOSITE_KEY ;
    
     Message
    ------------------------------------------------------------------------------------------
     Stream TEST_REKEY created and running. Created by query with query ID: CSAS_TEST_REKEY_9
    ------------------------------------------------------------------------------------------
    

    现在您有了一个数据流,其中的键设置为您的复合键:

    ksql> SELECT ROWKEY , COL3 FROM TEST_REKEY EMIT CHANGES LIMIT 3;
    +---------+-------+
    |ROWKEY   |COL3   |
    +---------+-------+
    |AB       |1      |
    |AB       |2      |
    |CD       |3      |
    Limit Reached
    Query terminated
    

    您还可以检查底层 Kafka 主题以验证密钥:

    ksql> PRINT TEST_REKEY LIMIT 3;
    Format:JSON
    {"ROWTIME":1578569329184,"ROWKEY":"AB","MY_COMPOSITE_KEY":"AB","STRINGA":"A","STRINGB":"B","COL3":1}
    {"ROWTIME":1578569331653,"ROWKEY":"AB","MY_COMPOSITE_KEY":"AB","STRINGA":"A","STRINGB":"B","COL3":2}
    {"ROWTIME":1578569339177,"ROWKEY":"CD","MY_COMPOSITE_KEY":"CD","STRINGA":"C","STRINGB":"D","COL3":3}
    ksql>
    

    完成后,我们现在可以在重新键入的主题之上声明一个表:

    CREATE TABLE TEST_TABLE (ROWKEY VARCHAR KEY, 
                             COL3 INT) 
        WITH (KAFKA_TOPIC='TEST_REKEY', VALUE_FORMAT='JSON');
    

    从这个表中我们可以查询状态。请注意,复合键 AB 仅显示最新值,这是表语义的一部分(与上面的流相比,您可以在其中看到两个值 - 流和表都是同一个 Kafka 主题):

    ksql> SELECT * FROM TEST_TABLE EMIT CHANGES;
    +----------------+---------+------+
    |ROWTIME         |ROWKEY   |COL3  |
    +----------------+---------+------+
    |1578569331653   |AB       |2     |
    |1578569339177   |CD       |3     |
    

    【讨论】:

    • 当我尝试这个时,我得到一个无法解析复合键的错误:```ksql> CREATE STREAM myevents_rekey > WITH (KAFKA_TOPIC='myevents_rekey') AS > SELECT myId + '# ' + myExternalId 作为 COMPOSITE_KEY > FROM myevents > PARTITION BY COMPOSITE_KEY; Line: 5, Col: 18: PARTITION BY column 'COMPOSITE_KEY' 无法解析。 ```对此有什么想法吗?
    • 请开始一个新问题,参考这个问题。
    【解决方案2】:

    只是对@Robin Moffat 的更新.. 使用下面的

    CREATE STREAM TEST_REKEY AS 
            SELECT STRINGA + STRINGB AS MY_COMPOSITE_KEY, 
                   STRINGA, 
                   STRINGB, 
                   COL3 
              FROM TEST 
              PARTITION BY STRINGA + STRINGB ;
    

    而不是

    CREATE STREAM TEST_REKEY AS 
            SELECT STRINGA + STRINGB AS MY_COMPOSITE_KEY, 
                   STRINGA, 
                   STRINGB, 
                   COL3 
              FROM TEST 
              PARTITION BY MY_COMPOSITE_KEY ;
    

    注意:列顺序很重要 为我工作! (CLI v0.10.1,服务器 v0.10.1)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-01-09
      • 2019-03-06
      • 1970-01-01
      • 1970-01-01
      • 2016-02-15
      相关资源
      最近更新 更多