【问题标题】:Divide one record to multiple in deserialize stage在反序列化阶段将一条记录划分为多条
【发布时间】:2021-09-24 13:37:16
【问题描述】:

我尝试通过 Flink 获取 Kinesis 数据。

就我而言,一条记录中有多条消息。我怎样才能把它分成多条记录? (我会将其发送到 Elasticsearch。)

我试图搜索它,但找不到合适的答案。

我的代码所做的是从 Kinesis 获取数据,解压缩 gzip,将其转换为字符串,然后将 objectMapper.readvalue 用于我的 POJO for java。

有两种 POJO:一种用于整个事件,一种用于 LogEvents。

{
  "messageType":"DATA_MESSAGE","owner":"<account id>",
  "logGroup":"<clustername>","logStream":"<log stream name>",
  "subscriptionFilters":["<subscription name>"],
  "logEvents":[
    {"id":"<id>","timestamp":<timestamp>,"message":"msg 1"},
    {"id":"<id>","timestamp":<timestamp>,"message":"msg 2"},
    {"id":"<id>","timestamp":<timestamp>,"message":"msg 3"},
    {"id":"<id>","timestamp":<timestamp>,"message":"msg 4"},
  ]
}

【问题讨论】:

    标签: java apache-flink amazon-kinesis


    【解决方案1】:

    您的反序列化数据的processElement() 方法可以多次调用output()。每次使用批次的不同元素。即,您需要在循环内循环调用output() 中的元素。

    链中的下一个运算符将获取单个元素。

    【讨论】:

      【解决方案2】:
      • 也许你可以使用平面地图功能

      • FlatMapFunction 的核心方法。从输入数据集中获取一个元素并将其转换为零个、一个或多个元素。

          datastream.flatMap(
                  new FlatMapFunction<POJO, LogEvent>() {
                      @Override
                      public void flatMap(POJO input, Collector<LogEvent> out) throws Exception {
                          LogEvent logEvent = xxxxx;
                          out.collect(logEvent);
                      }
                  });
        

      【讨论】:

        【解决方案3】:

        示例 JSON 具有数组属性。您需要编写一个自定义的用户定义函数将 JSON 转换为多行,并使用 Flink 中的 flatMap 函数来打破嵌套。

        通过这样做,您将能够以您需要的格式从 JSON 中提取行,并将它们作为行发送给后续的 Flink 操作员。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2017-02-24
          • 2019-02-18
          • 1970-01-01
          • 1970-01-01
          • 2019-07-18
          相关资源
          最近更新 更多