MapReduce是一种可用于数据处理的编程模型。该模型比较简单,但是想写出有用的程序却不太容易。Hadoop可以运行Java、Ruby和Python等语言的MapReduce。最重要的是MapReduce本质上是并行运行的,因此可以将大规模的数据分析任务分发给任何一个拥有足够多机器的数据中心。MapReduce的优势在于处理大规模数据集,这里先使用广为流传的气象数据集作为例子来实现一个简单的demo。在实现之前,我们先来介绍一个这个数据集。首先该数据集的数据记录的格式如下所示,

关于MapReduce(一)

数据文件按照日期和气象站进行组织。从1901到2001年,每一年都有一个目录,每一个目录中包含各个气象站该年气象数据的打包文件及其说明文件,例如,1999年对应文件夹下面就包含下面的记录:

关于MapReduce(一)

气象台有成千上万个,所以整个数据集由大量的小文件组成。一般情况下,处理小量的大型文件更容易、更高效,因此,这些数据集需要进行预处理,将每年的数据文件拼接成一个单独的文件。

对于该数据集,我们可以使用很多种方式来进行分析,首先,我们就来看一下使用Unix工具类分析数据,传统处理按行存储数据的工具是awk。以下的代码是用来实现每年的最高气温。

 

#!/usr/bin/env bash

for year in all/*

do

    echo –ne `basename $year .gz` “\t”

    gunzip –c $year | \

        awk ‘{ temp = substr($0,88,5) + 0;

              q = substr($0,93,1);

if(temp!=9999 && q ~ /[01459]/ && temp > max) max = temp}

           END {print max}’

done

这个脚本循环遍历按年压缩的数据文件,首先显示年份,然后使用awk处理每一个文件。awk从数据中提取两个字段:气温和质量代码,其中通过质量代码来检测读取的数值是否有疑问或错误。如果没有错误,就将该值和之前的最大值进行比较,如果比之前的数值大,就将最大值替换为该值。处理完文件的所有行后,再执行END块中的代码并在屏幕上输出最大气温值。

为了加快速度,我们需要并行处理程序来进行数据分析。从理论上讲,很简单:我们可以使用计算机上所有可用的硬件线程来处理,每个线程负责处理不同年份的数据。但这样做还有一些问题:

  1. 将任务分成大小相同的作业通常并不是一件容易的事情。对于该数据集来说,每一年数据文件的大小差异较大,因此分成多个线程去运行,总的运行时间还是取决于运行时间最长的那个作业。另一种更好的办法是将输入数据分成固定大小的块,然后每块分到各个进程去执行,这样的话,如果有一些进程可以处理更多的数据,我们就可以给他分配更多的数据去处理。
  2. 合并各个独立进程的运行结果,可能还需要额外进行处理。对于该数据集来说,每年的结果独立于其他年份,所以可能需要把所有结果拼接起来,然后再按年份进行排序。
  3. 受限于单台计算机的处理能力。即便用上所有处理器,也会花上几十分钟。而且某些数据集的增长可能会超出单台计算机的处理能力。

因此,虽然并行处理也是可行的,但实际上也很麻烦。可以借助于Hadoop类似框架来解决这些问题。

为了充分利用Hadoop提供的并行处理优势,我们需要将查询表示成MapReduce作业。

MapReduce任务过程分为两个阶段:Map和Reduce。

Map阶段的输入是NCDC原始数据。我们选择文本格式作为输入格式,将数据集的每一行作为文本输入。键是某一行起始位置相对于文件起始位置的偏移量,不过我们不需要这个信息。我们的Map函数很简单,只需要将年份和气温属性提取出来即可。为了全面的了解map的工作方式,我们考虑以下输入数据的示例数据:

关于MapReduce(一)

这些行以键-值对的方式作为Map函数的输入:

关于MapReduce(一)

key是文件中的行偏移量,Map函数并不需要这个信息,所以将其忽略。Map函数的功能仅限于提取年份和气温信息,并将它们作为输出:

关于MapReduce(一)

Map函数的输出经由MapReduce框架处理后,最后发送到Reduce函数。这个处理过程基于键来对键-值对进行排序和分组。因此,在这一示例中,reduce函数看到的是如下输入:

关于MapReduce(一)

每一年份后紧跟着一系列气温数据。reduce函数现在要做的是遍历整个列表并从中找出最大的读数:

关于MapReduce(一)

这是最终输出结果,每一年的全球最高气温纪录。

整个数据流如图所示。

关于MapReduce(一)

了解了MapReduce的原理后,下一步就是使用代码去实现。我们需要三个程序:Map、Reduce、运行作业的代码。Map函数由Mapper类来表示,后者声明一个抽象的map()方法,具体代码如下,

 

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class MaxTemperatureMapper extends MapReduceBase implements 

        Mapper< LongWritable ,Text,Text, IntWritable > {

        private static final int MISSING = 9999;

        

        @Override

        public void map(LongWritable key,Text value,Context context)

             throws IOException,InterruptedException {

                 String line = value.toString();

                 String year = line.substring(15,19);

                 int airTemperature;

                 if(line.charAt(87) == ‘+’){

                     airTemperature = Integer.parseInt(line.substring(88,92));

                 }else{

                     airTemperature = Integer.parseInt(line.substring(87,92));

                 }

                 String quality = line.substring(92,93);

             if(airTemperature != MISSING && quality.matches(“[01459]”)){

                 context.write(new Text(year),new IntWritable(airTemperature));

             }

        }

    }

 

以类似方法用Reducer来定义reduce函数,如下所示,

 

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class MaxTemperatureReducer extends Reducer< Text , IntWritable,Text, IntWritable > {

        

        @Override

        public void reduce(Text key,Iterable<IntWritable> values,Context context)

             throws IOException,InterruptedException {

                 int maxValue= Integer. MIN_VALUE;

                 for(IntWritable value:values){

                     maxValue = Math.max(maxValue,value.get());

             }

                 context.write(key,new IntWritable(maxValue))

        }

    }

 

第三部分代码负责运行MapReduce作业,只需要引入mapper类、reducer类和一些配置即可,这里不做详细赘述。

相关文章: