【问题标题】:Apache Flink - How to implement custom Deserializer implementing DeserializationSchemaApache Flink - 如何实现自定义 Deserializer 实现 DeserializationSchema
【发布时间】:2020-10-25 15:02:56
【问题描述】:

我正在使用 Flink,并且正在使用 Kafka 连接器。我从 flink 收到的消息是逗号分隔的项目列表。 “'a','b','c',1,0.1 ....'12:01:00.000'” 其中一个包含事件时间,我想将此事件时间用于每个分区的水印(在 kafka 源中),然后将此事件时间用于会话窗口。 我的情况与平常有些不同,因为据我了解,人们通常使用“kafka Timestamps”和 SimpleStringSchema()。在我的情况下,我必须编写自己的反序列化器来实现 DeserializationSchema 并返回一个元组或 Pojo。所以基本上用我自己的函数替换 SimpleStringSchema()。 Flink 提供了一些开箱即用的反序列化器,但我真的不明白如何创建自定义反序列化逻辑。

查看flink网站我发现了这个:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html

我已经举了一个例子(谢谢大卫!),但我仍然不知道如何实现我的。

https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventDeserializationSchema.java

我真的需要一个示例来说明如何为列表执行此操作。上面指出的是 JSON,所以给了我理论和概念,但我被困在那里。

【问题讨论】:

  • 你尝试了什么?你在哪里卡住?您应该已创建自定义 POJO 并使用标准 CSV 解析器代替 ObjectMapper。您还可以试用 Table API/SQL,它具有开箱即用的 CSV 支持。
  • 您好 Arvid,我是 Java 和 Flink 的新手,所以我很难将您的建议付诸实践。我来自 python .net。下面来自 Mikalai 的示例正是我想要的。谢谢

标签: apache-flink flink-streaming


【解决方案1】:

你应该介绍POJO之类的

public class Event implements Serializable {
    ...
    private Long timestamp;
}

并实现与链接中的类似的简单反序列化器 - 您可以通过逗号手动拆分消息字符串来解析行,或者您可以使用开箱即用的 csv 阅读器(如opencsv)来解析行进入你的 POJO:

public class EventDeserializationSchema implements DeserializationSchema<Event> {

    private static final long serialVersionUID = 1L;

    @Override
    public ClickEvent deserialize(byte[] message) throws IOException {
        String line = new String(message, StandardCharsets.UTF_8);
        String[] parts = line.split(",");
        
        Event event = new Event();
        // TODO: parts to event here
        return event;
    }

    @Override
    public boolean isEndOfStream(Event nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Event> getProducedType() {
        return TypeInformation.of(Event.class);
    }
}

【讨论】:

  • 这太棒了!!!给我几天时间,我可以消化/处理它,它看起来正是我想要的,在结构上与 JSON 非常相似,显然它很有意义。你还给我实现了可序列化的事件,太好了!非常感谢!!
  • 我已经准备好了一切,现在我在使用实现简单反序列化器的 Event 类时遇到了一些问题。我正在使用: - github.com/FasterXML/jackson-dataformats-text/tree/master/csv - mvnrepository.com/artifact/org.apache.flink/… 我会再试一次,看看我是否可以让它工作,但到目前为止还没有运气。如果我在接下来的几天找不到解决方案,我将为此发布另一个问题。谢谢!
猜你喜欢
  • 1970-01-01
  • 2018-10-05
  • 2016-02-26
  • 1970-01-01
  • 2020-08-31
  • 2020-08-30
  • 2010-12-02
  • 2016-07-06
  • 1970-01-01
相关资源
最近更新 更多