【问题标题】:How to remamp json with ksql in Kafka如何在 Kafka 中使用 ksql 重新映射 json
【发布时间】:2022-09-27 20:02:44
【问题描述】:

Kafka 中的 INPUT_DATA 主题获取使用相同 JSON 模式发布的所有数据消息:

{
  \"year\": {
    \"month\": {
      \"day\": {
        \"hour\": string
        }
      }
    }
}

首先,我使用持久性 ksql 查询创建 Kafka Stream。此流将读取发布到INPUT_TOPIC 的数据消息

CREATE OR REPLACE STREAM INPUT_STREAM (year STRUCT<month STRUCT<day STRUCT<hour VARCHAR>>>) WITH (KAFKA_TOPIC = \'INPUT_TOPIC\', VALUE_FORMAT = \'JSON\');

我使用 Confluent Kafka Web UI 确认 INPUT_STREAM 已正确创建,其架构定义为支持遵循架构的数据消息

{year: {month: {day: hour: string}}}}

接下来我需要创建第二个OUTPUT_STREAM。将创建此流以读取来自INPUT_STREAM 的数据消息。它将通过删除顶级 year 字段来重新映射原始输入 JSON 消息。因此,结果消息不是 4 级深,而是 3 级深,例如: 1.month > 2.day > 3.hour :

{
  \"month\": {
    \"day\": {
      \"hour\": string
      }
    }
}

OUTPUT_STREAM 会将重新映射的 JSON 消息写入它自己的 OUTPUT_TOPIC。此流的消息模式应为 3 级深:

所以整个设置由两个主题和两个流组成:

INPUT_TOPIC > INPUT_STREAM > OUTPUT_STREAM > OUTPUT_TOPIC

我继续整理下面发布的ksql 声明。它尝试将YEAR-&gt;MONTH-&gt;DAY-&gt;HOUR 重新映射为STRUCT&lt;MONTH STRUCT&lt;DAY STRUCT&lt;HOUR VARCHAR&gt;&gt;&gt;

CREATE STREAM OUTPUT_STREAM WITH (KAFKA_TOPIC=\'OUTPUT_TOPIC\', REPLICAS=3) AS SELECT YEAR->MONTH->DAY->HOUR HOUR STRUCT<MONTH STRUCT<DAY STRUCT<HOUR VARCHAR>>> FROM INPUT_DATA_STREAM EMIT CHANGES;

但它失败并出现错误Line 1:8: no viable alternative at input

是否可以使用 ksql 和流或表将输入数据中的值重新映射到不同的 json 模式?

  • 如果要删除顶部字段,只需要SELECT YEAR-&gt;MONTH。 IE。选择那个结构.架构应该是自动的...
  • 请使用显示此建议的 ksql 语句将其发布为答案。
  • 我不使用ksql,但是那行得通吗?并没有真正回答你得到的错误

标签: apache-kafka ksqldb


【解决方案1】:

SELECT YEAR-&gt;MONTH-&gt;DAY-&gt;HOUR 仅选择最里面的 hour 字符串字段。

要选择月份结构,您只需要SELECT YEAR-&gt;MONTH,这将带来该结构中的嵌套数据。

架构应该从原始流中推断出来,因此您不需要重新定义它。

【讨论】:

  • 这工作为:CREATE STREAM OUTPUT_STREAM WITH (KAFKA_TOPIC='OUTPUT_TOPIC', REPLICAS=3) AS SELECT YEAR-&gt;MONTH-&gt;DAY DAY FROM INPUT_STREAM EMIT CHANGES;
  • 那个输出是“3 级深”?从字面上阅读该查询,我只希望它返回{"day": {"hour": ... }}
  • 删除year 作为最顶层字段:CREATE STREAM OUTPUT_STREAM WITH (KAFKA_TOPIC='OUTPUT_TOPIC', REPLICAS=3) AS SELECT YEAR-&gt;MONTH MONTH FROM INPUT_STREAM EMIT CHANGES;
猜你喜欢
  • 2019-04-15
  • 1970-01-01
  • 2018-10-12
  • 2019-12-23
  • 2010-10-05
  • 1970-01-01
  • 1970-01-01
  • 2020-03-26
  • 2012-08-09
相关资源
最近更新 更多