【问题标题】:Scalding read multiple files from HDFS烫伤从 HDFS 读取多个文件
【发布时间】:2015-02-20 02:30:56
【问题描述】:

如何从 HDFS 上的目录中读取所有文件并使用 scalding 对其进行处理。对于本地文件系统,我使用以下

import com.twitter.scalding._
import com.twitter.scalding.JsonLine
import java.io._

class ParseJsonJob(args: Args) extends Job(args) {
  val fileList = new File(args("input")).listFiles
  val fields = ('device_guid
                ,'service_name
                ,'event_type
               )

  fileList.map {
    fileName =>
      JsonLine(fileName.toString, fields)
      .read
      .filter ('service_name) { name: String => name == "myservice" }
      .write(Tsv(args("output") + fileName.toString.split("/").last))
  }
}

这不适用于 HDFS。 TextLine 或 JsonLine 是否除了文件之外还读取目录?

【问题讨论】:

    标签: scala hadoop scalding


    【解决方案1】:

    您将获得一个 Hadoop 文件系统并使用 FileSystem.liststatus 原语扫描 HDFS 目录,类似于:

    ...
    val hadoopConf= implicitly[Mode] match {
      case Hdfs(_, conf) => conf
    }
    val fs= FileSystem.get(hadoopConf)
    for(status <- fs.listStatus(new Path(args("input")))) {
      JsonLine(status.getPath.toString.toString, fields)
          .read
          .filter ('service_name) { name: String => name == "myservice" }
          .write(Tsv(args("output") + fileName.toString.split("/").last))
     }
    

    【讨论】:

      【解决方案2】:

      导入 com.twitter.scalding._
      导入 com.twitter.scalding.JsonLine 导入 java.io._

      类 ParseJsonJob(args: Args) 扩展 Job(args) { val fields = ('device_guid,'service_name,'event_type)

          JsonLine(args("input"), fields)
          .read
          .filter ('service_name) { name: String => name == "myservice" }
          .write(Tsv(args("output") )   } }
      

      这对你有用。如果没有,请让我知道。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2014-06-19
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多