【发布时间】:2019-10-30 16:51:09
【问题描述】:
我有一个 spark 流上下文从 Kafka 消费者获取数据流。数据包含 JSON 对象。我需要将其转换为自定义 Java 对象,以便进行一些处理。有没有一种简单的方法可以做到这一点?基本上我想要一种将 JavaRDD 转换为普通字符串的方法,以便我可以使用 gson.fromJSON 将其转换为我的简单 POJO 类对象。
我尝试了一些方法,但遇到了序列化问题
JavaDStream jds = stream.map(x -> x.value());
jds.foreachRDD(x -> System.out.println(x.count()));
jds.foreachRDD(new VoidFunction<JavaRDD<String>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(JavaRDD<String> rdd) {
rdd.foreach(a -> {
TransactionData tr = gson.fromJson(a, TransactionData.class);
}
);
}
TransactionData 是一个普通的 Java bean 类,有两个字段 id 和 amount 以及它们的 getter/setter 方法
在上面的代码中,我收到关于序列化的错误。 这是错误: org.apache.spark.SparkException:任务不可序列化原因:java.io.NotSerializableException:com.google.gson.Gson 序列化堆栈:-对象不可序列化(类:com.google.gson.Gson,值:{serializeNulls:错误工厂:[工厂[typeHierarchy=com.google.gson.JsonElement,adapter=com.google.gson.internal.bind.TypeAdapters$25@35c645ea]....
关于如何解决这个问题的任何想法?
【问题讨论】:
-
这是错误:org.apache.spark.SparkException:任务不可序列化原因:java.io.NotSerializableException:com.google.gson.Gson 序列化堆栈:-对象不可序列化(类: com.google.gson.Gson,值:{serializeNulls:falsefactories:[Factory[typeHierarchy=com.google.gson.JsonElement,adapter=com.google.gson.internal.bind.TypeAdapters$25@35c645ea],
-
最好添加
TransactionData类包含的内容以及JavaRDD<String> rdd包含的字符串值。另外,请编辑问题并在此处添加例外,而不是在评论中。
标签: java json string apache-spark rdd