【问题标题】:Define KSQL STRUCT on JSON valued topic with different types在具有不同类型的 JSON 值主题上定义 KSQL STRUCT
【发布时间】:2018-11-22 21:59:00
【问题描述】:

修改:为了更好地反映意图而进行了轻微修改,但由于取得了进展而进行了较大的修改。)

一个主题"t_raw"被赋予多种类型的消息,它们都包含一个共同的"type"键:

{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}

最终,我需要将其拆分为其他流,然后将它们切碎/聚合/处理。我希望能够将STRUCT 用于一切,但我目前的努力让我这样做:

create stream raw (type varchar, data varchar) \
  with (kafka_topic='t_raw', value_format='JSON');

第一层,那么

create stream key1 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as \
  select \
    extractjsonfield(data, '$.ts') as ts, \
    extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.b') as b \
  from raw where type='key1';
create stream key2 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as \
  select \
    extractjsonfield(data, '$.ts') as ts, \
    extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.c') as c, \
    extractjsonfield(data, '$.d') as d \
  from raw where type='key2';

这似乎可行,但最近添加了STRUCT,有没有办法像上面那样使用它来代替extractjsonfield

ksql> select * from key1;
1542741621100 | null | 2018-11-20 19:20:21.1 | 1 | hello
1542741623300 | null | 2018-11-20 19:20:23.3 | 2 | hello2
^CQuery terminated
ksql> select * from key2;
1542741622200 | null | 2018-11-20 19:20:22.2 | 1 | 11 | goodbye
1542741624400 | null | 2018-11-20 19:20:24.4 | 3 | 22 | goodbye2

如果不是STRUCT,是否有一种直接的方法可以使用香草卡夫卡流(副ksql,因此 标签)?

有没有更卡夫卡式/高效/优雅的方式来解析这个? 我无法将其定义为空的STRUCT<>

ksql> CREATE STREAM some_input ( type VARCHAR, data struct<> ) \
      WITH (KAFKA_TOPIC='t1', VALUE_FORMAT='JSON');
line 1:52: extraneous input '<>' expecting {',', ')'}

some (not-so-recent) discussion 可以做类似的事情

CREATE STREAM key1 ( a INT, b VARCHAR ) AS \
  SELECT data->* from some_input where type = 'key1';

仅供参考:上述解决方案在 confluent-5.0.0 中不起作用,a recent patch 修复了 extractjsonfield 错误并启用了此解决方案。

真实数据有更多类似的消息类型。它们都包含"type""data" 键(顶层没有其他键),并且几乎所有的"ts" 时间戳等效项都嵌套在"data" 中。

【问题讨论】:

  • 我知道 Avro 可以做到这一点,并且使数据中的 c 字段在 Avro 架构中具有一些其他默认值,但不确定 JSON 是否可以这样工作
  • @MatthiasJ.Sax,我拥有(并保留)apache-kafka-streams 的原因是,由于这是一个流而不是(还)一个表,它可以通过非ksql 机制。我对此持开放态度,即使这不是我的专长。不过,感谢您的编辑。
  • @cricket_007,我在 avro 规范中没有看到任何允许我在那里有条件的 "data" 定义的内容,也没有在 create stream ... 中的 def 中看到任何内容,所以这并不让我感到惊讶完全。谢谢。
  • 对于这种情况,您应该在问题中提及这一点:)
  • 在我们键入时进行编辑...

标签: apache-kafka-streams apache-kafka apache-kafka-streams ksqldb


【解决方案1】:

是的,您可以这样做 - 如果列不存在,KSQL 不介意,您只需获得 null 值。

测试数据设置

将一些测试数据填充到主题中:

kafkacat -b kafka:29092 -t t_raw -P <<EOF
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
EOF

将主题转储到 KSQL 控制台进行检查:

ksql> PRINT 't_raw' FROM BEGINNING;
Format:JSON
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"ROWTIME":1542965737437,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
^CTopic printing ceased
ksql>

对数据源流建模

在它上面创建一个流。注意STRUCT 的使用和每个可能列的引用:

CREATE STREAM T (TYPE VARCHAR, \
                 DATA STRUCT< \
                      TS VARCHAR, \
                      A INT, \
                      B VARCHAR, \
                      C INT, \
                      D VARCHAR>) \
        WITH (KAFKA_TOPIC='t_raw',\
              VALUE_FORMAT='JSON');

将偏移量设置为最早,这样我们就可以查询整个主题,然后使用KSQL访问完整的流:

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql>
ksql> SELECT * FROM T;
1542965737436 | null | key1 | {TS=2018-11-20 19:20:21.1, A=1, B=hello, C=null, D=null}
1542965737436 | null | key2 | {TS=2018-11-20 19:20:22.2, A=1, B=null, C=11, D=goodbye}
1542965737436 | null | key1 | {TS=2018-11-20 19:20:23.3, A=2, B=hello2, C=null, D=null}
1542965737437 | null | key2 | {TS=2018-11-20 19:20:24.4, A=3, B=null, C=22, D=goodbye2}
^CQuery terminated

单独查询类型,使用-&gt; 操作符访问嵌套元素:

ksql> SELECT DATA->A,DATA->B FROM T WHERE TYPE='key1'  LIMIT 2;
1 | hello
2 | hello2

ksql> SELECT DATA->A,DATA->C,DATA->D FROM T WHERE TYPE='key2' LIMIT 2;
1 | 11 | goodbye
3 | 22 | goodbye2

将数据保存在单独的 Kafka 主题中:

用分离的数据填充目标主题:

ksql> CREATE STREAM TYPE_1 AS SELECT DATA->TS, DATA->A, DATA->B FROM T WHERE TYPE='key1';

Message
----------------------------
Stream created and running
----------------------------
ksql> CREATE STREAM TYPE_2 AS SELECT DATA->TS, DATA->A, DATA->C, DATA->D FROM T WHERE TYPE='key2';

Message
----------------------------
Stream created and running
----------------------------

新流的架构:

ksql> DESCRIBE TYPE_1;

Name                 : TYPE_1
Field    | Type
--------------------------------------
ROWTIME  | BIGINT           (system)
ROWKEY   | VARCHAR(STRING)  (system)
DATA__TS | VARCHAR(STRING)
DATA__A  | INTEGER
DATA__B  | VARCHAR(STRING)
--------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> DESCRIBE TYPE_2;

Name                 : TYPE_2
Field    | Type
--------------------------------------
ROWTIME  | BIGINT           (system)
ROWKEY   | VARCHAR(STRING)  (system)
DATA__TS | VARCHAR(STRING)
DATA__A  | INTEGER
DATA__C  | INTEGER
DATA__D  | VARCHAR(STRING)
--------------------------------------

支撑每个 KSQL 流的主题:

ksql> LIST TOPICS;

Kafka Topic                 | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
t_raw                       | true       | 1          | 1                  | 2         | 2
TYPE_1                      | true       | 4          | 1                  | 0         | 0
TYPE_2                      | true       | 4          | 1                  | 0         | 0
---------------------------------------------------------------------------------------------------------

【讨论】:

  • 谢谢!不错的穿越。我还不知道如何最好地进行基准测试......你知道使用这个联合的structextractjsonfield 方法之间是否存在任何“显着”差异(在速度、鲁棒性等方面)?跨度>
猜你喜欢
  • 1970-01-01
  • 2020-01-09
  • 2022-09-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-11-28
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多