【发布时间】:2015-10-30 13:55:59
【问题描述】:
我开发了一个基于 hadoop 的解决方案来处理二进制文件。这使用了经典的 hadoop MR 技术。二进制文件大约 10GB,分为 73 个 HDFS 块,写成 map 进程的业务逻辑对这 73 个块中的每一个块进行操作。我们在 Hadoop 中开发了一个 customInputFormat 和 CustomRecordReader,它们将键(intWritable)和值(BytesWritable)返回给 map 函数。该值只不过是 HDFS 块的内容(二进制数据)。业务逻辑知道如何读取这些数据。
现在,我想将此代码移植到 spark 中。我是 spark 的初学者,可以在 spark 中运行简单的示例(wordcount、pi 示例)。但是,无法在 spark 中处理 binaryFiles 的简单示例。我看到这个用例有两种解决方案。首先,避免使用自定义输入格式和记录阅读器。在 spark 中找到一种方法(方法),为这些 HDFS 块创建 RDD,使用类似映射的方法将 HDFS 块内容提供给业务逻辑。如果这是不可能的,我想重新使用自定义输入格式和自定义阅读器,使用一些方法,如 HadoopAPI、HadoopRDD 等。我的问题:- 我不知道第一种方法是否可行。如果可能的话,任何人都可以提供一些包含示例的指针吗?我正在尝试第二种方法,但非常不成功。这是我使用的代码sn-p
package org {
object Driver {
def myFunc(key : IntWritable, content : BytesWritable):Int = {
println(key.get())
println(content.getSize())
return 1
}
def main(args: Array[String]) {
// create a spark context
val conf = new SparkConf().setAppName("Dummy").setMaster("spark://<host>:7077")
val sc = new SparkContext(conf)
println(sc)
val rd = sc.newAPIHadoopFile("hdfs:///user/hadoop/myBin.dat", classOf[RandomAccessInputFormat], classOf[IntWritable], classOf[BytesWritable])
val count = rd.map (x => myFunc(x._1, x._2)).reduce(_+_)
println("The count is *****************************"+count)
}
}
}
请注意,main 方法中的 print 语句会打印 73,即块数,而 map 函数中的 print 语句会打印 0。
有人能告诉我这里哪里做错了吗?我认为我没有以正确的方式使用 API,但未能找到一些文档/使用示例。
【问题讨论】:
-
应该从
myFunc函数内部打印什么?块数和块大小? -
另外请记住,除非您在本地模式下运行 spark,否则在 map 函数中打印的内容将在工作节点日志中打印。
-
是的.. 在 myfunc 内部,我希望可以打印任意数字的密钥,并且应该打印块大小应该为 128MB。但是,我看到的两者都是 0。这是我在工作节点日志中看到的。
-
另外,为了处理字节,我想获取给定地图任务的 InputSplit。我在文献中读到我需要将 TaskContext 传递给 map 函数,有一个名为 mapPartitionsWithContext 的方法可以提供上下文。但此方法在 spark 1.5.1 中已弃用。无法弄清楚我应该使用哪种方法来获取任务上下文。此处的任何帮助/指示都将有助于进一步取得进展。
标签: scala hadoop apache-spark