【问题标题】:How to get partition key or other columns when "group by" is used in KSQL?在 KSQL 中使用“分组依据”时如何获取分区键或其他列?
【发布时间】:2019-07-12 06:47:11
【问题描述】:

基本上,当我在查询中使用group by 表达式时,我会尝试获取所有列。

从主题创建流

CREATE STREAM events_stream \
      ( \
     account VARCHAR, \
     event_id VARCHAR, \
     user_name VARCHAR, \
     event_name VARCHAR, \
     source VARCHAR, \
     message VARCHAR, \
     timestamp STRUCT<iMillis INTEGER>) \
    WITH (KAFKA_TOPIC='console_failure', VALUE_FORMAT='JSON');

从上面的流创建一个表。

ksql> CREATE TABLE events_table AS \
      SELECT source, count(*) \
      FROM events_stream \
      WINDOW TUMBLING (SIZE 60 SECONDS) \
      WHERE account = '1111111111' \
                  GROUP BY source \
                  HAVING count(*) > 3;

生成此消息 4 次。

ip="10.10.10.10"

data = {
        "account": "1111111111",
        "event_id": "4cdabe46-690d-494a-a37e-6e455781d8b4",
        "user_name": "shakeel",
        "event_name": "some_event",
        "source": "127.0.0.1",
        "message": "message related to event",
        "timestamp": {
            "iMillis": 1547543309000
             }
        }

producer.send('console_failure', key='event_json', value=dict(data)

这按预期工作! 但是如何获取匹配结果的其他字段(例如:用户名、消息等)?

ksql> select * from events_table;
1550495772262 | 10.10.10.10 : Window{start=1550495760000 end=-} | 10.10.10.10 | 4
ksql> 

使用后我的理解是使用group by语句时可能无法获取其他列。

ksql> CREATE TABLE events_table1 AS \
>      SELECT source, event_id, \
>               count(*) \
>     FROM events_stream \
>     WINDOW TUMBLING (SIZE 60 SECONDS) \
>      WHERE account = '1111111111' \
>                  GROUP BY source \
>                  HAVING count(*) > 3;
Group by elements should match the SELECT expressions.
ksql>

我们可以通过重新加密流来实现这一点吗?

阅读this 后,我尝试使用event_id 字段重新设置流的密钥,但不确定如何在group by 语句中使用分区键。

以下是我尝试重新生成密钥时遇到的错误。

ksql> CREATE STREAM events_stream_rekey AS SELECT * FROM events_stream PARTITION BY event_id;

 Message
----------------------------
 Stream created and running
----------------------------
ksql>
ksql> SELECT ROWKEY, EVENT_ID FROM events_stream_rekey;
4cdabe46-690d-494a-a37e-6e455781d8b4 | 4cdabe46-690d-494a-a37e-6e455781d8b4
ksql>

ksql> CREATE TABLE  events_table2 AS \
>      SELECT source, \
>               count(*), \
>     WITH (KAFKA_TOPIC='EVENTS_STREAM_REKEY', VALUE_FORMAT='JSON', KEY='event_id'),
>     WINDOW TUMBLING (SIZE 60 SECONDS) \
>      WHERE account = '1111111111' \
>                  GROUP BY source \
>                  HAVING count(*) > 3;
line 1:70: extraneous input 'WITH' expecting {'(', 'NOT', 'NO', 'NULL', 'TRUE', 'FALSE', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'CASE', 'PARTITION', 'STRUCT', 'REPLACE', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'CAST', 'SHOW', 'TABLES', 'SCHEMAS', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'IF', '+', '-', '*', STRING, BINARY_LITERAL, INTEGER_VALUE, DECIMAL_VALUE, IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}


KSQL 版本详情:CLI v5.1.0、Server v5.1.0

-------------- 复制步骤 ------ --------

生产者脚本:该脚本将在不到 30 秒的窗口内生成 4 条消息。

import time
import uuid
from kafka import KafkaProducer
from json import dumps

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x:
                         dumps(x).encode('utf-8'))

for i in range(1, 5):
    time.sleep(1)
    data = {
        "account": "1111111111",
        "event_id": str(uuid.uuid4()),
        "user_name": "user_{0}".format(i),
        "event_name": "event_{0}".format(i),
        "source": "10.0.9.1",
        "message": "message related to event {0}".format(i),
        "timestamp": {
            "iMillis": int(round(time.time() * 1000))
        }
    }
    time.sleep(2)
    producer.send('testing_topic', value=data)

关于使用来自 testing_topic 的消息(使用普通的消费者脚本)。

{'account': '1111111111', 'event_id': 'c186ba8a-2402-428a-a5d8-c5b8279e14af', 'user_name': 'user_1', 'event_name': 'event_1', 'source': '10.0.9.1', 'message': 'message related to event 1', 'timestamp': {'iMillis': 1551296878444}}
{'account': '1111111111', 'event_id': '4c45bff7-eb40-48a8-9972-301ad24af9ca', 'user_name': 'user_2', 'event_name': 'event_2', 'source': '10.0.9.1', 'message': 'message related to event 2', 'timestamp': {'iMillis': 1551296881465}}
{'account': '1111111111', 'event_id': '4ee14303-e6d1-4847-ae3d-22b49b3ce6eb', 'user_name': 'user_3', 'event_name': 'event_3', 'source': '10.0.9.1', 'message': 'message related to event 3', 'timestamp': {'iMillis': 1551296884469}}
{'account': '1111111111', 'event_id': '3c196ac5-9559-4269-bf51-95b78ce4ffcb', 'user_name': 'user_4', 'event_name': 'event_4', 'source': '10.0.9.1', 'message': 'message related to event 4', 'timestamp': {'iMillis': 1551296887472}}

预期结果:如果消息在相同account 的窗口时间的 30 秒内包含相同的source 地址,那么我希望下一个立即完成 消息(在我的情况下是第 4 条消息,如下所示)。这可以使用 KSQL 实现吗?

{'account': '1111111111', 'event_id': '3c196ac5-9559-4269-bf51-95b78ce4ffcb', 'user_name': 'user_4', 'event_name': 'event_4', 'source': '10.0.9.1', 'message': 'message related to event 4', 'timestamp': {'iMillis': 1551296887472}}

【问题讨论】:

    标签: apache-kafka kafka-consumer-api ksqldb


    【解决方案1】:

    除了罗宾的回答,这个错误:

    line 1:70: extraneous input 'WITH' expecting {'(', 'NOT', 'NO', 'NULL', 'TRUE', 'FALSE', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'CASE', 'PARTITION', 'STRUCT', 'REPLACE', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'CAST', 'SHOW', 'TABLES', 'SCHEMAS', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'IF', '+', '-', '*', STRING, BINARY_LITERAL, INTEGER_VALUE, DECIMAL_VALUE, IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
    

    指的是您的 WITH 子句位于错误的位置。正确的模式是:

    CREATE TABLE <table name> WITH(...) AS SELECT ...
    

    你的陈述如下:

    ksql> CREATE TABLE events_table2
    >     WITH (KAFKA_TOPIC='EVENTS_STREAM_REKEY', VALUE_FORMAT='JSON', KEY='event_id'),
    >     AS
    >     SELECT source, count(*),
    >     WINDOW TUMBLING (SIZE 60 SECONDS)
    >      WHERE account = '1111111111'
    >                  GROUP BY source
    >                  HAVING count(*) > 3;
    

    【讨论】:

    • 是的,我试过这个,但我得到了如下所示的错误。上面的查询对你有用吗?请让我知道执行此查询是否需要任何语法更改。 ` ksql> CREATE TABLE events_table3 > WITH (KAFKA_TOPIC='EVENTS_STREAM_REKEY', VALUE_FORMAT='JSON', KEY='event_id'), > AS > SELECT source, count(), > WINDOW TUMBLING (SIZE 60 SECONDS) > WHERE account = '1111111111' > GROUP BY source > HAVING count() > 3;第 2:86 行:输入不匹配 ',' 期待 ';'引起:org.antlr.v4.runtime.InputMismatchException ksql> `
    【解决方案2】:

    消息本身实际上告诉你问题:)

    Group by 元素应该匹配 SELECT 表达式。

    所以在这里,source SELECTGROUP BY 都有 source

    ksql> SELECT source, count(*) \
    >      FROM events_stream \
    >      WINDOW TUMBLING (SIZE 60 SECONDS) \
    >      WHERE account = '1111111111' \
    >                  GROUP BY source \
    >                  HAVING count(*) > 3;
    127.0.0.1 | 4
    ^CQuery terminated
    

    要添加其他列,请确保将它们也添加到 SELECT

    ksql> SELECT source, event_id, count(*) \
    >      FROM events_stream \
    >      WINDOW TUMBLING (SIZE 60 SECONDS) \
    >      WHERE account = '1111111111' \
    >                  GROUP BY source, event_id \
    >                  HAVING count(*) > 3;
    127.0.0.1 | 4cdabe46-690d-494a-a37e-6e455781d8b4 | 4
    

    编辑以回答您更新的问题

    我认为这不能[容易]在 SQL(或 KSQL)中完成。您可以通过在聚合操作中包含时间戳来实现类似的效果,例如:

    CREATE TABLE source_alert AS \
    SELECT source, COUNT(*), MAX(timestamp) \
    FROM event_stream WINDOW TUMBLING (SIZE 60 SECONDS) \
    GROUP BY `source` \
    HAVING COUNT(*)>1
    

    然后获取结果表并加入事件流:

    SELECT * \
     FROM event_stream e \
          INNER JOIN \
          source_alert a ON e.source=a.source \
    WHERE e.timestamp=a.timestamp
    

    我没有尝试过,但原则上,它可能会让你到达你想去的地方。

    【讨论】:

    • > 你写的是 Robin,但在我的情况下,event_id 对于每条消息都是不同的,所以我不会使用 event_id 分组。我需要 'event_id' 作为键,以便我可以用来加入另一个表。 > 你能告诉我任何其他更好的方法来获取第四条消息的所有列,这将是上述group by 查询的结果。
    • 如果你没有在你的GROUP BY 中包含event_id,那么将它作为消息键是没有逻辑意义的。您无法聚合但仍包含较低粒度的数据。也许您需要更新您的问题,以更清楚地说明您正在尝试做什么。
    • 感谢 Robin 的回复,我只是在寻找这种查询“select * from table window tumbling (size 30 seconds) where account = '1111111111' group by source with count(*) > 3; ”。如果还不清楚,请告诉我,我很乐意为您提供更多详细信息。
    • 是的,请使用您的输入数据示例和您希望从查询中获得的预期输出来更新您的问题。
    • 感谢您的及时回复,我已经更新了我的问题,其中包含一些重现和预期结果的步骤。如果可以使用 KSQL 实现,请帮助我?
    猜你喜欢
    • 2020-07-04
    • 1970-01-01
    • 1970-01-01
    • 2022-12-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-02-08
    相关资源
    最近更新 更多