【问题标题】:How to Construct Nested JSON Message on Output Topic in KSQLDB如何在 KSQLDB 的输出主题上构造嵌套的 JSON 消息
【发布时间】:2020-08-18 22:40:10
【问题描述】:

我从其中一个源系统收到了以下事件负载

为以下 json 负载创建 Stream1

事件 JSON 1

{
 "event": {
  "header": {
   "name":"abc",
   "version":"1.0",
   "producer":"123",
   "channel":"lab",
   "countryCode":"US"
  },
  "body":{"customerIdentifiers":[
   {"customerIdentifier":"1234","customerIdType":"cc"},
   {"customerIdentifier":"234","customerIdType":"id"}
  ],
  "accountIdentifiers":[
   {"accountIdentifier":"123","accountIdType":"no"},
   {"accountIdentifier":"Primary","accountIdType":"da"}
  ],
  "eventDetails":{
   "offeramount":"40000",
   "apr":"2.6%",
   "minpayment":"400",
   "status":"Approved"
  }
 }
}

事件 JSON 2

{
 "event": {
  "header": {
   "name":"abc",
   "version":"1.0",
   "producer":"123",
   "channel":"lab",
   "countryCode":"US"
  },
  "body":{"customerIdentifiers":[
   {"customerIdentifier":"1234","customerIdType":"cc"},
   {"customerIdentifier":"234","customerIdType":"id"}
  ],
  "accountIdentifiers":[
   {"accountIdentifier":"123","accountIdType":"no"},
   {"accountIdentifier":"Primary","accountIdType":"da"}
  ],
  "eventDetails":{
   "offeramount":"70000",
   "apr":"3.6%",
   "minpayment":"600",
   "status":"Rejected"
  }
 }
}

我在上面的stream1上创建了聚合表

CREATE TABLE EVENT_TABLE AS 
  SELECT 
    avg(minpayment) as Avg_MinPayment, 
    avg(apr) AS Avg_APr, 
    avg(offeramount) AS Avgofferamount , 
    status 
  FROM STREAM1 
  GROUP BY status 
  EMIT CHANGES;
Status | Avg_MinPayment | Avg_APr | Avgofferamount 
-----------------------------------------
Approved | 400 | 2.6% | 40000

Rejected | 600 | 3.6% | 70000

我从 KTable 和 KTable Topic json 中得到了上面的结果,看起来像这样

聚合 JSON1

打印“事件表”;

{
  "Status" : "Approved", 
  "Avg_Minpayment" : "400", 
  "Avg_APr" : "2.6%", 
  "offeramount" : "40000"
}

聚合 JSON2

{
  "Status" : "Rejected", 
  "Avg_Minpayment" : "600", 
  "Avg_APr" : "3.6%", 
  "offeramount" : "70000"
}

但我必须在输出主题上构建并发布最终目标 json,如下 json 格式。我必须将标题和正文添加到聚合 json1 和聚合 json2。

{
 "event":{
  "header":{
   "name":"abc",
   "version":"1.0",
   "producer":"123",
   "channel":"lab",
   "countryCode":"US"
  },
  "body":{
   "Key":[
{"Status":"approved","Avg_Minpayment":"400","Avg_APr":"2.6%","offeramount":"40000"},
{"Status":"rejected","Avg_Minpayment":"600","Avg_APr":"3.6%","offeramount":"70000"}
  ]
 }
}

【问题讨论】:

  • format your code 方便用户在未来使用 =)
  • 如果您的示例有效,它也会有所帮助。比如你的sql使用avg(salary),但是你的源数据中没有salary
  • 很遗憾,我没有足够的信息来发表更多评论。您所需的输出似乎是将多个输入组合在一起。但是您没有解释您希望它们如何分组,也没有提供足够的示例输入。提供更详细的问题描述,我可能会提供帮助。
  • @AndrewCoates。这是样本数据,我可能会错过薪水,但主要问题是我会得到这样的聚合数据 {“Status”:“Approved”,“Avg_Sal”:“10000”,“Avg_APr”:“3.6%”,“offeramount” :“40000”},我想将标题和重复属性添加到最终输出的 json 数据中。我如何构建 json
  • 对于标题属性,我可以添加任何静态值,它仅用于结构 {"header":{"name":"abc","version":"1.0","producer":" 123","channel":"lab","countryCode":"US"}....想要将此标头添加到我的汇总结果 json 中。有什么方法可以连接吗?或任何在流中构建整个 json 的解决方案?

标签: apache-kafka ksqldb


【解决方案1】:

鉴于您的示例 SQL 不会产生示例输出,给定示例输入,因此您要实现的目标并不十分清楚。事实上,您的示例 SQL 会因未知列错误而失败。

类似以下的内容生成您的示例输出:

CREATE TABLE EVENT_TABLE AS 
  SELECT 
    status,
    avg(eventDetails->minpayment) as Avg_MinPayment, 
    avg(eventDetails->apr) AS Avg_APr, 
    avg(eventDetails->offeramount) AS Avgofferamount
  FROM STREAM1 
  GROUP BY status 
  EMIT CHANGES;

接下来,您的示例输出...

Status | Avg_MinPayment | Avg_APr | Avgofferamount 
-----------------------------------------
Approved | 400 | 2.6% | 40000

Rejected | 600 | 3.6% | 70000

...每个状态输出一行。然而,你说你想要实现的输出......

{
 "event":{
  "header":{
   "name":"abc",
   "version":"1.0",
   "producer":"123",
   "channel":"lab",
   "countryCode":"US"
  },
  "body":{
   "Key":[
{"Status":"approved","Avg_Minpayment":"400","Avg_APr":"2.6%","offeramount":"40000"},
{"Status":"rejected","Avg_Minpayment":"600","Avg_APr":"3.6%","offeramount":"70000"}
  ]
 }
}

...包含两种状态,即将您的两个示例输入消息组合成一个输出。

如果我对您的理解正确,并且您确实想输出上述 JSON,那么:

您首先需要包含event 信息。但是哪些事件信息?如果您知道它们总是相同的,那么您可以使用:

CREATE TABLE EVENT_TABLE AS 
  SELECT 
    status,
    latest_by_offset(event) as event,
    avg(eventDetails->minpayment) as Avg_MinPayment, 
    avg(eventDetails->apr) AS Avg_APr, 
    avg(eventDetails->offeramount) AS Avgofferamount
  FROM STREAM1 
  GROUP BY status 
  EMIT CHANGES;

latest_by_offset 聚合函数将从它看到的最后一条消息中捕获event 信息。虽然我不相信这是你想要的。您不能收到其他带有不同 event 信息的rejectedaccepted 消息吗?如果是 event 信息确定哪些消息应该被分组在一起,那么这样的事情可能会给你一些接近你想要的东西:

CREATE TABLE EVENT_TABLE AS 
  SELECT 
    event,
    collect_list(eventDetails) as body
  FROM STREAM1 
  GROUP BY event 
  EMIT CHANGES;

如果这很接近,那么您可能需要使用STRUCT 构造函数和AS_VALUE 函数来重构您的输出。例如:

CREATE TABLE EVENT_TABLE AS 
  SELECT 
    event as key,
    AS_VALUE(event) as event,
    STRUCT(
      keys := collect_list(eventDetails)
    ) as body
  FROM STREAM1 
  GROUP BY event 
  EMIT CHANGES;

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-02-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-05-29
    • 2019-10-17
    • 1970-01-01
    • 2012-05-24
    相关资源
    最近更新 更多