【发布时间】:2018-11-22 21:59:00
【问题描述】:
(修改:为了更好地反映意图而进行了轻微修改,但由于取得了进展而进行了较大的修改。)
一个主题"t_raw"被赋予多种类型的消息,它们都包含一个共同的"type"键:
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
最终,我需要将其拆分为其他流,然后将它们切碎/聚合/处理。我希望能够将STRUCT 用于一切,但我目前的努力让我这样做:
create stream raw (type varchar, data varchar) \
with (kafka_topic='t_raw', value_format='JSON');
第一层,那么
create stream key1 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as \
select \
extractjsonfield(data, '$.ts') as ts, \
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.b') as b \
from raw where type='key1';
create stream key2 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as \
select \
extractjsonfield(data, '$.ts') as ts, \
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.c') as c, \
extractjsonfield(data, '$.d') as d \
from raw where type='key2';
这似乎可行,但最近添加了STRUCT,有没有办法像上面那样使用它来代替extractjsonfield?
ksql> select * from key1;
1542741621100 | null | 2018-11-20 19:20:21.1 | 1 | hello
1542741623300 | null | 2018-11-20 19:20:23.3 | 2 | hello2
^CQuery terminated
ksql> select * from key2;
1542741622200 | null | 2018-11-20 19:20:22.2 | 1 | 11 | goodbye
1542741624400 | null | 2018-11-20 19:20:24.4 | 3 | 22 | goodbye2
如果不是STRUCT,是否有一种直接的方法可以使用香草卡夫卡流(副ksql,因此apache-kafka-streams 标签)?
有没有更卡夫卡式/高效/优雅的方式来解析这个?
我无法将其定义为空的STRUCT<>
ksql> CREATE STREAM some_input ( type VARCHAR, data struct<> ) \
WITH (KAFKA_TOPIC='t1', VALUE_FORMAT='JSON');
line 1:52: extraneous input '<>' expecting {',', ')'}
some (not-so-recent) discussion 可以做类似的事情
CREATE STREAM key1 ( a INT, b VARCHAR ) AS \
SELECT data->* from some_input where type = 'key1';
仅供参考:上述解决方案在 confluent-5.0.0 中不起作用,a recent patch 修复了 extractjsonfield 错误并启用了此解决方案。
真实数据有更多类似的消息类型。它们都包含"type" 和"data" 键(顶层没有其他键),并且几乎所有的"ts" 时间戳等效项都嵌套在"data" 中。
【问题讨论】:
-
我知道 Avro 可以做到这一点,并且使数据中的 c 字段在 Avro 架构中具有一些其他默认值,但不确定 JSON 是否可以这样工作
-
@MatthiasJ.Sax,我拥有(并保留)apache-kafka-streams 的原因是,由于这是一个流而不是(还)一个表,它可以通过非
ksql机制。我对此持开放态度,即使这不是我的专长。不过,感谢您的编辑。 -
@cricket_007,我在 avro 规范中没有看到任何允许我在那里有条件的
"data"定义的内容,也没有在create stream ...中的 def 中看到任何内容,所以这并不让我感到惊讶完全。谢谢。 -
对于这种情况,您应该在问题中提及这一点:)
-
在我们键入时进行编辑...
标签: apache-kafka-streams apache-kafka apache-kafka-streams ksqldb