【问题标题】:Using Custom Hadoop input format for processing binary file in Spark在 Spark 中使用自定义 Hadoop 输入格式处理二进制文件
【发布时间】:2015-10-30 13:55:59
【问题描述】:

我开发了一个基于 hadoop 的解决方案来处理二进制文件。这使用了经典的 hadoop MR 技术。二进制文件大约 10GB,分为 73 个 HDFS 块,写成 map 进程的业务逻辑对这 73 个块中的每一个块进行操作。我们在 Hadoop 中开发了一个 customInputFormat 和 CustomRecordReader,它们将键(intWritable)和值(BytesWritable)返回给 map 函数。该值只不过是 HDFS 块的内容(二进制数据)。业务逻辑知道如何读取这些数据。

现在,我想将此代码移植到 spark 中。我是 spark 的初学者,可以在 spark 中运行简单的示例(wordcount、pi 示例)。但是,无法在 spark 中处理 binaryFiles 的简单示例。我看到这个用例有两种解决方案。首先,避免使用自定义输入格式和记录阅读器。在 spark 中找到一种方法(方法),为这些 HDFS 块创建 RDD,使用类似映射的方法将 HDFS 块内容提供给业务逻辑。如果这是不可能的,我想重新使用自定义输入格式和自定义阅读器,使用一些方法,如 HadoopAPI、HadoopRDD 等。我的问题:- 我不知道第一种方法是否可行。如果可能的话,任何人都可以提供一些包含示例的指针吗?我正在尝试第二种方法,但非常不成功。这是我使用的代码sn-p

package org {  
object Driver {      
  def myFunc(key : IntWritable, content : BytesWritable):Int = {      
    println(key.get())
    println(content.getSize())
    return 1       
  }    
  def main(args: Array[String]) {       
    // create a spark context
    val conf = new SparkConf().setAppName("Dummy").setMaster("spark://<host>:7077")
    val sc = new SparkContext(conf)    
    println(sc)   
    val rd = sc.newAPIHadoopFile("hdfs:///user/hadoop/myBin.dat", classOf[RandomAccessInputFormat], classOf[IntWritable], classOf[BytesWritable])  
    val count = rd.map (x => myFunc(x._1, x._2)).reduce(_+_)
    println("The count is *****************************"+count)
  }
} 

}

请注意,main 方法中的 print 语句会打印 73,即块数,而 map 函数中的 print 语句会打印 0。

有人能告诉我这里哪里做错了吗?我认为我没有以正确的方式使用 API,但未能找到一些文档/使用示例。

【问题讨论】:

  • 应该从myFunc 函数内部打印什么?块数和块大小?
  • 另外请记住,除非您在本地模式下运行 spark,否则在 map 函数中打印的内容将在工作节点日志中打印。
  • 是的.. 在 myfunc 内部,我希望可以打印任意数字的密钥,并且应该打印块大小应该为 128MB。但是,我看到的两者都是 0。这是我在工作节点日志中看到的。
  • 另外,为了处理字节,我想获取给定地图任务的 InputSplit。我在文献中读到我需要将 TaskContext 传递给 map 函数,有一个名为 mapPartitionsWithContext 的方法可以提供上下文。但此方法在 spark 1.5.1 中已弃用。无法弄清楚我应该使用哪种方法来获取任务上下文。此处的任何帮助/指示都将有助于进一步取得进展。

标签: scala hadoop apache-spark


【解决方案1】:

一目了然的几个问题。您定义了myFunc,但调用了func。你的myFunc 没有返回类型,所以你不能调用collect()。如果你的myFunc 真的没有返回值,你可以用foreach 代替map

collect() 将 RDD 中的数据拉到驱动程序中,以便您在本地(在驱动程序上)对其进行处理。

【讨论】:

    【解决方案2】:

    我在这个问题上取得了一些进展。我现在正在使用以下功能来完成这项工作

    var hRDD = new NewHadoopRDD(sc, classOf[RandomAccessInputFormat], 
            classOf[IntWritable], 
            classOf[BytesWritable],
            job.getConfiguration() 
            )    
    
    val count = hRDD.mapPartitionsWithInputSplit{ (split, iter) => myfuncPart(split, iter)}.collect()
    

    但是,出现了另一个错误,我在此处发布了详细信息 Issue in accessing HDFS file inside spark map function

    15/10/30 11:11:39 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 40.221.94.235): java.io.IOException: No FileSystem for scheme: spark
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-06-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-01-27
      • 2016-10-31
      相关资源
      最近更新 更多