【发布时间】:2022-09-27 20:02:44
【问题描述】:
Kafka 中的 INPUT_DATA 主题获取使用相同 JSON 模式发布的所有数据消息:
{
\"year\": {
\"month\": {
\"day\": {
\"hour\": string
}
}
}
}
首先,我使用持久性 ksql 查询创建 Kafka Stream。此流将读取发布到INPUT_TOPIC 的数据消息
CREATE OR REPLACE STREAM INPUT_STREAM (year STRUCT<month STRUCT<day STRUCT<hour VARCHAR>>>) WITH (KAFKA_TOPIC = \'INPUT_TOPIC\', VALUE_FORMAT = \'JSON\');
我使用 Confluent Kafka Web UI 确认 INPUT_STREAM 已正确创建,其架构定义为支持遵循架构的数据消息
{year: {month: {day: hour: string}}}}
接下来我需要创建第二个OUTPUT_STREAM。将创建此流以读取来自INPUT_STREAM 的数据消息。它将通过删除顶级 year 字段来重新映射原始输入 JSON 消息。因此,结果消息不是 4 级深,而是 3 级深,例如: 1.month > 2.day > 3.hour :
{
\"month\": {
\"day\": {
\"hour\": string
}
}
}
OUTPUT_STREAM 会将重新映射的 JSON 消息写入它自己的 OUTPUT_TOPIC。此流的消息模式应为 3 级深:
所以整个设置由两个主题和两个流组成:
INPUT_TOPIC > INPUT_STREAM > OUTPUT_STREAM > OUTPUT_TOPIC
我继续整理下面发布的ksql 声明。它尝试将YEAR->MONTH->DAY->HOUR 重新映射为STRUCT<MONTH STRUCT<DAY STRUCT<HOUR VARCHAR>>>:
CREATE STREAM OUTPUT_STREAM WITH (KAFKA_TOPIC=\'OUTPUT_TOPIC\', REPLICAS=3) AS SELECT YEAR->MONTH->DAY->HOUR HOUR STRUCT<MONTH STRUCT<DAY STRUCT<HOUR VARCHAR>>> FROM INPUT_DATA_STREAM EMIT CHANGES;
但它失败并出现错误Line 1:8: no viable alternative at input
是否可以使用 ksql 和流或表将输入数据中的值重新映射到不同的 json 模式?
-
如果要删除顶部字段,只需要
SELECT YEAR->MONTH。 IE。选择那个结构.架构应该是自动的... -
请使用显示此建议的 ksql 语句将其发布为答案。
-
我不使用ksql,但是那行得通吗?并没有真正回答你得到的错误
标签: apache-kafka ksqldb