【问题标题】:Convert Hadoop spark Scala function to Java将 Hadoop spark Scala 函数转换为 Java
【发布时间】:2021-02-18 18:00:20
【问题描述】:

尝试将此函数从 Scala 转换为 Java:

def ib_solr_doc_join_doc_prepare(name: String): RDD[SFCAdTargetingDoc] = {
  val inputPath = INPUT + name

  val data = sc.sequenceFile(inputPath, classOf[Text], classOf[Text]).map(x => x._2.toString)
  data mapPartitions { p =>
    val gson = new Gson
    p map { row =>
      gson.fromJson(row, classOf[SFCAdTargetingDoc])
    }
  }
}

据我所知,这个 sc 是 spark 上下文对象,所以我像这样使用 JavaSparkContext

JavaRDD<String> j = sc.sequenceFile(args[0], Text.class, Text.class).map(t -> new String(t._2().getBytes()));

像这样使用 gson:

GsonBuilder builder = new GsonBuilder();
builder.setPrettyPrinting();

Gson gson = builder.create();

主要故障位置是

data mapPartitions { p =>
  val gson = new Gson
  p map { row =>
    gson.fromJson(row, classOf[SFCAdTargetingDoc])
  }
}

is 上下文中的数据是 JavaRDD,但 mapPartitions 在 Java 中的行为与在 scala 中的行为完全不同。

【问题讨论】:

  • 行为上有什么区别?我能看到的唯一原因是创建新字符串:new String(t._2().getBytes() vs x._2.toString - 我认为这部分会产生不同的结果
  • j.mapPartitions { p-> } p 是一个没有映射函数的迭代器,我必须使用 forEachRemaining 来代替。这也会弹出: AbstractJavaRDDLike> 类型中的方法 mapPartitions(FlatMapFunction,U>) 不适用于参数 (( p) -> {})

标签: java scala apache-spark hadoop hdfs


【解决方案1】:

我会尝试根据您的回答提出解决方案

是的,java 对此类操作不太方便,提供的内置功能也不太实用,因此您应该全部手动完成。

(p -> {}) 不适用的原因是 lambda 应该返回 Iterable 以被视为 FlatMapFunction<.. u>。而在java中,每个运算符都不是一个表达式,就像在scala中一样,你应该手动指定你的函数的值。

此外,由于 Iterator 没有映射功能,我们不得不自己遍历它并进行映射。以上所有都将我们引向下一个解决方案

data.mapPartitions(p -> {
    Gson gson = new Gson()
    List<SCFAdTargetingDoc> result = ArrayList<>();
    while (t.hasNext()) {
      result.add(gson.fromJson(t.next(), SCFAdTargetingDoc.class))
    }

    return result;
})

这看起来很难看,但应该可以。此外,如果您想使用类似 scala 的操作,例如 map,您可以考虑使用具有这些功能的 Java Streams。但是将迭代器转换为流也不是很简单。有多种方法可以做到这一点,我发现这个很有用:

Iterable<String> iterable = () -> i;
Stream<String> sttream = StreamSupport.stream(iterable.spliterator(), false);

这将我们的示例转换为:

data.mapPartitions(p -> {
  Iterable<String> iterable = () -> p;
  Stream<String> stream = StreamSupport.stream(iterable.spliterator(), false);

  return stream.map(s -> gson.fromJson(s, 
    SCFAdTargetingDoc.class)).collect(Collectors.toList());
}) 

它仍然不如 scala 代码简洁,但它仍然是 java。如果您的代码中有很多这样的调用,您可以为可重复操作创建单独的函数,例如创建流和转换为列表,您的代码将如下所示

data.mapPartitions(p -> toList(streamify(p).map(s -> gson.fromJson(s, 
    SCFAdTargetingDoc.class)))

这可能是您使用 java 无需太多努力即可实现的最佳效果。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-06-11
    • 2018-03-05
    • 2020-02-09
    • 1970-01-01
    • 2021-11-25
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多