【发布时间】:2020-08-18 22:40:10
【问题描述】:
我从其中一个源系统收到了以下事件负载
为以下 json 负载创建 Stream1
事件 JSON 1
{
"event": {
"header": {
"name":"abc",
"version":"1.0",
"producer":"123",
"channel":"lab",
"countryCode":"US"
},
"body":{"customerIdentifiers":[
{"customerIdentifier":"1234","customerIdType":"cc"},
{"customerIdentifier":"234","customerIdType":"id"}
],
"accountIdentifiers":[
{"accountIdentifier":"123","accountIdType":"no"},
{"accountIdentifier":"Primary","accountIdType":"da"}
],
"eventDetails":{
"offeramount":"40000",
"apr":"2.6%",
"minpayment":"400",
"status":"Approved"
}
}
}
事件 JSON 2
{
"event": {
"header": {
"name":"abc",
"version":"1.0",
"producer":"123",
"channel":"lab",
"countryCode":"US"
},
"body":{"customerIdentifiers":[
{"customerIdentifier":"1234","customerIdType":"cc"},
{"customerIdentifier":"234","customerIdType":"id"}
],
"accountIdentifiers":[
{"accountIdentifier":"123","accountIdType":"no"},
{"accountIdentifier":"Primary","accountIdType":"da"}
],
"eventDetails":{
"offeramount":"70000",
"apr":"3.6%",
"minpayment":"600",
"status":"Rejected"
}
}
}
我在上面的stream1上创建了聚合表
CREATE TABLE EVENT_TABLE AS
SELECT
avg(minpayment) as Avg_MinPayment,
avg(apr) AS Avg_APr,
avg(offeramount) AS Avgofferamount ,
status
FROM STREAM1
GROUP BY status
EMIT CHANGES;
Status | Avg_MinPayment | Avg_APr | Avgofferamount
-----------------------------------------
Approved | 400 | 2.6% | 40000
Rejected | 600 | 3.6% | 70000
我从 KTable 和 KTable Topic json 中得到了上面的结果,看起来像这样
聚合 JSON1
打印“事件表”;
{
"Status" : "Approved",
"Avg_Minpayment" : "400",
"Avg_APr" : "2.6%",
"offeramount" : "40000"
}
聚合 JSON2
{
"Status" : "Rejected",
"Avg_Minpayment" : "600",
"Avg_APr" : "3.6%",
"offeramount" : "70000"
}
但我必须在输出主题上构建并发布最终目标 json,如下 json 格式。我必须将标题和正文添加到聚合 json1 和聚合 json2。
{
"event":{
"header":{
"name":"abc",
"version":"1.0",
"producer":"123",
"channel":"lab",
"countryCode":"US"
},
"body":{
"Key":[
{"Status":"approved","Avg_Minpayment":"400","Avg_APr":"2.6%","offeramount":"40000"},
{"Status":"rejected","Avg_Minpayment":"600","Avg_APr":"3.6%","offeramount":"70000"}
]
}
}
【问题讨论】:
-
请format your code 方便用户在未来使用 =)
-
如果您的示例有效,它也会有所帮助。比如你的sql使用
avg(salary),但是你的源数据中没有salary。 -
很遗憾,我没有足够的信息来发表更多评论。您所需的输出似乎是将多个输入组合在一起。但是您没有解释您希望它们如何分组,也没有提供足够的示例输入。提供更详细的问题描述,我可能会提供帮助。
-
@AndrewCoates。这是样本数据,我可能会错过薪水,但主要问题是我会得到这样的聚合数据 {“Status”:“Approved”,“Avg_Sal”:“10000”,“Avg_APr”:“3.6%”,“offeramount” :“40000”},我想将标题和重复属性添加到最终输出的 json 数据中。我如何构建 json
-
对于标题属性,我可以添加任何静态值,它仅用于结构 {"header":{"name":"abc","version":"1.0","producer":" 123","channel":"lab","countryCode":"US"}....想要将此标头添加到我的汇总结果 json 中。有什么方法可以连接吗?或任何在流中构建整个 json 的解决方案?
标签: apache-kafka ksqldb