【发布时间】:2019-10-02 01:19:43
【问题描述】:
我正在使用 Flink,并且有一个 JSON 字符串流到达我的系统,其中包含动态变化的字段和嵌套字段。所以我不能模拟并将这个传入的 JSON 转换为静态 POJO,而我必须依赖 Map。
我的第一个转换是使用 GSON 解析将 JSON 字符串流转换为 Map 对象流,然后将映射包装在名为 Data 的 DTO 中。
(inside the first map transformation)
LinkedTreeMap map = gson.fromJson(input, LinkedTreeMap.class);
Data data = new Data(map); // Data has getters, setters for the map and implements Serializable
在此转换处理之后,我尝试将生成的流馈送到我的自定义 Flink 接收器中时出现问题。调用函数不会在接收器中调用。但是,如果我从包含 DTO 的 Map 更改为没有 Map 的原始 DTO 或常规 DTO,则接收器可以工作。
我的 DTO 如下所示:
public class FakeDTO {
private String id;
private LinkedTreeMap map; // com.google.gson.internal
// getters and setters
// constructors, empty and with fields
我尝试了以下两种解决方案:
env.getConfig().addDefaultKryoSerializer(LinkedTreeMap.class,MapSerializer.class;
env.getConfig().disableGenericTypes();
在这种情况下我可以使用任何专家建议吗?
【问题讨论】:
-
如果您在转换之后使用一个只记录有关地图的无操作
FilterFunction会发生什么?是否有任何记录?
标签: java hashmap gson apache-flink flink-streaming