1. 背景
近日帮外部门的同事处理一个小需求,就是将HDFS中2018年至今所有存储的sequence序列化文件读取出来,重新保存成文本格式,以便于他后续进行处理。由于同事主要做机器学习方向,对hadoop或spark方面不了解,所以我就想着这么小的需求,简单支持下即可,花个几分钟写了一个脚本提供给他,没想到,过了一天他又找到我,说脚本读取出来的文件大部分有问题…原来自己代码有bug
2. 初始版本
Spark或Hadoop读取sequence文件只需调用相应函数即可。
第一版本的spark程序代码如下:
1 package com.ws.test 2 3 import org.apache.hadoop.io.{BytesWritable, Text} 4 import org.apache.spark.{SparkConf, SparkContext} 5 6 object Test { 7 def main(args: Array[String]): Unit = { 8 if (args.length < 1) { 9 println("input param error: <method name>") 10 } else { 11 args(0) match { 12 case "deseqData" => deseqData(args) 13 case _ => 14 } 15 } 16 } 17 18 def deseqData(args: Array[String]): Unit ={ 19 20 if(args.length != 3){ 21 println("input param error: <method name> <input dir> <output dir>") 22 return 23 } 24 25 val conf = new SparkConf() 26 conf.setAppName(args(0)) 27 val sc = new SparkContext(conf) 28 29 val inputDir = args(1) 30 val outputDir = args(2) 31 32 sc.sequenceFile[Text, BytesWritable](s"hdfs://$inputDir") 33 .map(data => new String(data._2.getBytes)).saveAsTextFile(outputDir) 34 35 sc.stop() 36 } 37 }