【问题标题】:mapreduce composite Key sample - doesn't show the desired outputmapreduce 复合密钥示例 - 不显示所需的输出
【发布时间】:2016-10-20 23:24:57
【问题描述】:

刚接触 mapreduce 和 hadoop 世界,在尝试了基本的 mapreduce 程序后,我想尝试一下复合键示例代码。

输入数据集如下:

国家、州、县、百万人口

美国,加利福尼亚,阿拉米达,100

美国,加利福尼亚,洛杉矶,200

美国,加利福尼亚,萨克拉门托,100

美国、佛罗里达州、xxx、10

美国,佛罗里达州,yyy,12

所需的输出数据应如下所示:

美国,加利福尼亚,500

美国,佛罗里达州,22

这里改为 Country+State 字段形成复合键。 我得到以下输出。由于某种原因,人口没有增加。有人可以指出我正在做的错误。另请查看实现 WriteableComparable 接口的 Country.java 类。该实现可能有问题。

美国,加利福尼亚,100

美国,加利福尼亚,200

美国,加利福尼亚,100

美国,佛罗里达州,10

美国,佛罗里达州,12

人口没有按国家+州增加。

这是实现 WritableComparable 接口的 Country 类。

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;  
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  

 * The Country class implements WritabelComparator to implements custom    sorting to perform group by operation. It
 * sorts country and then state.
 * 
 */
public class Country implements WritableComparable<Country> {

    Text country;
    Text state;

    public Country(Text country, Text state) {
        this.country = country;
        this.state = state;
    }
    public Country() {
        this.country = new Text();
        this.state = new Text();

    }

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
     */
    public void write(DataOutput out) throws IOException {
        this.country.write(out);
        this.state.write(out);

    }

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
     */
    public void readFields(DataInput in) throws IOException {

        this.country.readFields(in);
        this.state.readFields(in);
        ;

    }

    /*
     * (non-Javadoc)
     * 
     * @see java.lang.Comparable#compareTo(java.lang.Object)
     */
    public int compareTo(Country pop) {
        if (pop == null)
            return 0;
        int intcnt = country.compareTo(pop.country);
        if (intcnt != 0) {
            return intcnt;
        } else {
            return state.compareTo(pop.state);

        }
    }

    /*
     * (non-Javadoc)
     * 
     * @see java.lang.Object#toString()
     */
    @Override
    public String toString() {

        return country.toString() + ":" + state.toString();
    }

}

驱动程序:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  


public class CompositeKeyDriver {

 public static void main(String[] args) throws IOException,    ClassNotFoundException, InterruptedException {


    Configuration conf = new Configuration();

    Job job = Job.getInstance(conf, "CompositeKeyDriver");

    //first argument is job itself
    //second argument is location of the input dataset
    FileInputFormat.addInputPath(job, new Path(args[0]));

    //first argument is the job itself
    //second argument is the location of the output path        
    FileOutputFormat.setOutputPath(job, new Path(args[1]));        


    job.setJarByClass(CompositeKeyDriver.class);

    job.setMapperClass(CompositeKeyMapper.class);

    job.setReducerClass(CompositeKeyReducer.class);

    job.setOutputKeyClass(Country.class);

    job.setOutputValueClass(IntWritable.class);


    //setting the second argument as a path in a path variable           
    Path outputPath = new Path(args[1]);

    //deleting the output path automatically from hdfs so that we don't have delete it explicitly            
    outputPath.getFileSystem(conf).delete(outputPath);


    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

映射程序:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  


  //  First two parameters are Input Key and Input Value. Input Key =   offset of each line (remember each line is a record). Input value = Line itself
  //  Second two parameters are Output Key and Output value of the Mapper. BTW, the outputs of the mapper are stored in the local file system and not on HDFS. 
  //  Output Key = Country object is sent. Output Value = population in millions in that country + state combination


    public class CompositeKeyMapper extends Mapper<LongWritable, Text, Country, IntWritable> {

    /** The cntry. */
    Country cntry = new Country();

    /** The cnt text. */
    Text cntText = new Text();

    /** The state text. */
    Text stateText = new Text();

    //population in a Country + State
    IntWritable populat = new IntWritable();

    /**
     * 
     * Reducer are optional in Map-Reduce if there is no Reducer defined in program then the output of the Mapper
     * directly write to disk without sorting.
     * 
     */

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //Reader will give each record in a line to the Mapper.
        //That line is split with the de-limiter ","
        String line = value.toString();

        String[] keyvalue = line.split(",");


        //Country is the first item in the line in each record
        cntText.set(new Text(keyvalue[0]));

        //State is the second item in the line in each record
        stateText.set(keyvalue[1]);

        //This is the population. BTW, we can't send Java primitive datatypes into Context object. Java primitive data types are not effective in Serialization and De-serialization.
        //So we have to use the equivalent Writable datatypes provided by mapreduce framework

        populat.set(Integer.parseInt(keyvalue[3]));

        //Here you are creating an object of Country class and in the constructor assigning the country name and state
        Country cntry = new Country(cntText, stateText);

        //Here you are passing the country object and their population to the context object.
        //Remember that country object already implements "WritableComparable" interface which is equivalient to "Comparable" interface in Java. That implementation is in Country.java class
        //Because it implements the WritableComparable interface, the Country objects can be sorted in the shuffle phase. If WritableComparable interface is not implemented, we 
        //can't sort the objects.

        context.write(cntry, populat);

    }
}

Reducer 程序:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  


 //Remember the two output parameters of the Mapper class will become  the first two input parameters to the reducer class.

 public  class CompositeKeyReducer extends Reducer<Country, IntWritable, Country, IntWritable> {

 // The first parameter to reduce method is "Country". The country object has country name and state name (look at the Country.java class for more details.
 // The second parameter "values"   is the collection of population for Country+State (this is a composite Key)

    public void reduce(Country key, Iterator<IntWritable> values, Context context) throws IOException, InterruptedException {

        int numberofelements = 0;

       int cnt = 0;

       while (values.hasNext()) {

            cnt = cnt + values.next().get();

       }

    context.write(key, new IntWritable(cnt));

    }

}

【问题讨论】:

    标签: java hadoop mapreduce


    【解决方案1】:

    您正在使用HashPartitioner,因此您的Country 类需要实现hashCode() 方法。

    目前它将在Object 上使用默认的hashCode() 实现,这将导致您的密钥无法正确分组。

    这是一个示例hashCode() 方法:

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((country == null) ? 0 : country.hashCode());
        result = prime * result + ((state == null) ? 0 : state.hashCode());
        return result;
    }
    

    其他信息:

    为了安全起见,您应该set 文本对象。目前,您在 Country 构造函数中执行此操作。

    public Country(Text country, Text state) {
        this.country = country;
        this.state = state;
    }
    

    您应该将其更改为:

    public Country(Text country, Text state) {
        this.country.set(country);
        this.state.set(state);
    }
    

    【讨论】:

    • 如何实现哈希码方法?您能否对此进行解释或指出我可以学习的链接。
    • 我为您添加了一个示例。快速 google 会发现大量关于哈希码的资源。
    • 非常感谢。我会成功实施并通知您
    • 如果我们实现 HashCode 方法,我们不应该也实现 Equals() 方法吗?
    • 由您决定,您的问题是为什么您的密钥没有分组,这纯粹是一个hashcode 问题,请参阅HashPartitioner。如果你想把你的对象放在一个集合中,那么是的,你需要equals
    【解决方案2】:

    reducer 问题现已修复。我没有对代码进行任何更改。我所做的只是重新启动我的 Cloudera Hadoop 映像。

    我在调试过程中注意到以下几点。有人可以对这些观察发表评论吗?

    1. 频繁更改代码、创建 jar 文件和运行 mapreduce jar 程序不会反映在输出中。这并非一直都在发生。不确定是否需要不时重启 hadoop 守护进程。

    【讨论】:

    • 这种情况有时会发生。不确定hadoop是原因还是IDE。但很可能它应该是 IDE。
    【解决方案3】:

    我遇到了和 Basam 一样的问题,虽然重启我的 Cloudera 并不足以解决它。

    在 CompositeKeyReducer 类中,我将 Iterator 替换为 Iterable 和其他几行代码:

    public void reduce(TextPair key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {
    
        int numberofelements = 0;
    
        int cnt = 0;
    
        for (IntWritable value : values) {
            cnt += value.get();
        }
    
        context.write(key, new IntWritable(cnt));
    

    结果:

    美国:CA 500

    美国:佛罗里达州 22

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2013-09-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-05-24
      • 1970-01-01
      • 1970-01-01
      • 2021-06-04
      相关资源
      最近更新 更多