【问题标题】:How to read the nested avro fields for creating streams?如何读取嵌套的 avro 字段以创建流?
【发布时间】:2018-03-22 15:46:52
【问题描述】:

我在 Kafka 主题中有以下 AVRO 消息。

{
"table": {
    "string": "Schema.xDEAL"
},
"op_type": {
    "string": "Insert"
},
"op_ts": {
    "string": "2018-03-16 09:03:25.000462"
},
"current_ts": {
    "string": "2018-03-16 10:03:37.778000"
},
"pos": {
    "string": "00000000000000010722"
},
"before": null,
"after": {
    "row": {
        "DEA_PID_DEAL": {
            "string": "AAAAAAAA"
        },
        "DEA_NME_DEAL": {
            "string": "MY OGG DEAL"
        },
        "DEA_NME_ALIAS_NAME": {
            "string": "MY OGG DEAL"
        },
        "DEA_NUM_DEAL_CNTL": {
            "string": "4swb6zs4"
        }           
    }
}

}

当我运行以下查询时。它使用空值创建流。

   CREATE STREAM tls_deal (DEA_PID_DEAL VARCHAR, DEA_NME_DEAL varchar, DEA_NME_ALIAS_NAME VARCHAR, DEA_NUM_DEAL_CNTL VARCHAR) WITH (kafka_topic='deal-ogg-topic',value_format='AVRO', key = 'DEA_PID_DEAL');

但是当我将 AVRO 消息更改为跟随它时,它会起作用。

 {
"table": {
    "string": "Schema.xDEAL"
},
"op_type": {
    "string": "Insert"
},
"op_ts": {
    "string": "2018-03-16 09:03:25.000462"
},
"current_ts": {
    "string": "2018-03-16 10:03:37.778000"
},
"pos": {
    "string": "00000000000000010722"
},
"DEA_PID_DEAL": {
    "string": "AAAAAAAA"
},
"DEA_NME_DEAL": {
    "string": "MY OGG DEAL"
},
"DEA_NME_ALIAS_NAME": {
    "string": "MY OGG DEAL"
},
"DEA_NUM_DEAL_CNTL": {
    "string": "4swb6zs4"
}           

}

现在,如果我运行上述查询,数据将被填充。

我的问题是如果我需要从嵌套字段填充流,我该如何处理?

我无法在 KSQL 文档页面中找到解决方案。

提前致谢。我很感激帮助。 :)

【问题讨论】:

  • 出于好奇,这条消息是从哪里来的?根据您的源数据库,Debezium 可能是一种替代方案;正如 Robin 所说,我们提供了一个 SMT,可让您提取 CDC 消息的“之后”状态。
  • 是的,你是对的,但我们使用的是 OGG。

标签: apache-kafka avro confluent-platform ksqldb


【解决方案1】:

正如 Robin 所说,当前不支持此功能(2018 年 3 月 22 日/v0.5)。但是,这是一个跟踪的功能请求。您可能想在 KSQL 存储库中投票或跟踪这个 Github 问题:

https://github.com/confluentinc/ksql/issues/638

【讨论】:

  • 预计何时发布。
  • 感谢您的回复。这真的很有帮助。我很感激。
  • 我无法提供任何细节,但它在列表中排名靠前。
  • 感谢您的确认。它有助于规划。 :)
【解决方案2】:

KSQL 目前(2018 年 3 月 22 日/v0.5)不支持嵌套 Avro。您可以使用单消息转换来扁平化来自 Kafka Connect 的数据。例如,Debezium 附带 UnwrapFromEnvelope

【讨论】:

  • 感谢罗宾的回复 真的很有帮助。我很感激。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-05-22
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多