【发布时间】:2021-09-24 13:37:16
【问题描述】:
我尝试通过 Flink 获取 Kinesis 数据。
就我而言,一条记录中有多条消息。我怎样才能把它分成多条记录? (我会将其发送到 Elasticsearch。)
我试图搜索它,但找不到合适的答案。
我的代码所做的是从 Kinesis 获取数据,解压缩 gzip,将其转换为字符串,然后将 objectMapper.readvalue 用于我的 POJO for java。
有两种 POJO:一种用于整个事件,一种用于 LogEvents。
{
"messageType":"DATA_MESSAGE","owner":"<account id>",
"logGroup":"<clustername>","logStream":"<log stream name>",
"subscriptionFilters":["<subscription name>"],
"logEvents":[
{"id":"<id>","timestamp":<timestamp>,"message":"msg 1"},
{"id":"<id>","timestamp":<timestamp>,"message":"msg 2"},
{"id":"<id>","timestamp":<timestamp>,"message":"msg 3"},
{"id":"<id>","timestamp":<timestamp>,"message":"msg 4"},
]
}
【问题讨论】:
标签: java apache-flink amazon-kinesis