【问题标题】:Apache Flink: How do I use a stream of Java Map (or Map containing DTOs)?Apache Flink:如何使用 Java Map 流(或包含 DTO 的 Map)?
【发布时间】:2019-10-02 01:19:43
【问题描述】:

我正在使用 Flink,并且有一个 JSON 字符串流到达我的系统,其中包含动态变化的字段和嵌套字段。所以我不能模拟并将这个传入的 JSON 转换为静态 POJO,而我必须依赖 Map。

我的第一个转换是使用 GSON 解析将 JSON 字符串流转换为 Map 对象流,然后将映射包装在名为 Data 的 DTO 中。

(inside the first map transformation)
LinkedTreeMap map = gson.fromJson(input, LinkedTreeMap.class);

Data data = new Data(map); // Data has getters, setters for the map and implements Serializable

在此转换处理之后,我尝试将生成的流馈送到我的自定义 Flink 接收器中时出现问题。调用函数不会在接收器中调用。但是,如果我从包含 DTO 的 Map 更改为没有 Map 的原始 DTO 或常规 DTO,则接收器可以工作。

我的 DTO 如下所示:

public class FakeDTO {
    private String id;
    private LinkedTreeMap map; // com.google.gson.internal

    // getters and setters
    // constructors, empty and with fields

我尝试了以下两种解决方案:

env.getConfig().addDefaultKryoSerializer(LinkedTreeMap.class,MapSerializer.class; 
env.getConfig().disableGenericTypes();

在这种情况下我可以使用任何专家建议吗?

【问题讨论】:

  • 如果您在转换之后使用一个只记录有关地图的无操作FilterFunction 会发生什么?是否有任何记录?

标签: java hashmap gson apache-flink flink-streaming


【解决方案1】:

我能够解决这个问题。在我的 Flink 日志中,我看到没有找到一个名为 ReflectionSerializerFactory 类的 Kryo 文件。我在 maven 中更新了 Kryo 版本,并为我的地图使用了 Flink 文档说 Flink 支持的 Map 类型。

只需确保在您的代码中指定了泛型类型,并在您的 POJO 中为 Maps 添加 getter 和 setter。

我还使用 .returns(xyz.class) 类型声明来避免类型擦除的影响。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-11-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-05-03
    相关资源
    最近更新 更多