【发布时间】:2021-02-18 18:00:20
【问题描述】:
尝试将此函数从 Scala 转换为 Java:
def ib_solr_doc_join_doc_prepare(name: String): RDD[SFCAdTargetingDoc] = {
val inputPath = INPUT + name
val data = sc.sequenceFile(inputPath, classOf[Text], classOf[Text]).map(x => x._2.toString)
data mapPartitions { p =>
val gson = new Gson
p map { row =>
gson.fromJson(row, classOf[SFCAdTargetingDoc])
}
}
}
据我所知,这个 sc 是 spark 上下文对象,所以我像这样使用 JavaSparkContext
JavaRDD<String> j = sc.sequenceFile(args[0], Text.class, Text.class).map(t -> new String(t._2().getBytes()));
像这样使用 gson:
GsonBuilder builder = new GsonBuilder();
builder.setPrettyPrinting();
Gson gson = builder.create();
主要故障位置是
data mapPartitions { p =>
val gson = new Gson
p map { row =>
gson.fromJson(row, classOf[SFCAdTargetingDoc])
}
}
is 上下文中的数据是 JavaRDD,但 mapPartitions 在 Java 中的行为与在 scala 中的行为完全不同。
【问题讨论】:
-
行为上有什么区别?我能看到的唯一原因是创建新字符串:new String(t._2().getBytes() vs x._2.toString - 我认为这部分会产生不同的结果
-
j.mapPartitions { p-> } p 是一个没有映射函数的迭代器,我必须使用 forEachRemaining 来代替。这也会弹出: AbstractJavaRDDLike
> 类型中的方法 mapPartitions(FlatMapFunction ,U>) 不适用于参数 (( p) -> {})
标签: java scala apache-spark hadoop hdfs