一、MapJoin-DistributedCache 应用 

     1、mapreduce join 介绍

在各种实际业务场景中,按照某个关键字对两份数据进行连接是非常常见的。如果两份数据 都比较小,那么可以直接在内存中完成连接。如果是大数据量的呢? 显然,在内存中进行连 接会发生 OOM。 MapReduce 可以用来解决大数据量的链接
MapReduce 的 Join 操作主要分两类: MapJoin 和 ReduceJoin

先看 ReduceJoin:
(1)map 阶段,两份数据 data1 和 data2 会被 map 分别读入,解析成以链接字段为 key 以查 询字段为 value 的 key-value 对,并标明数据来源是 data1 还是 data2
(2)reduce 阶段, reducetask 会接收来自 data1 和 data2 的相同 key 的数据,在 reduce 端进 行乘积链接, 最直接的影响是很消耗内存,导致 OOM 

再看 MapJoin:
MapJoin 适用于有一份数据较小的连接情况。做法是直接把该小份数据直接全部加载到内存 当中,按链接关键字建立索引。 然后大份数据就作为 MapTask 的输入,对 map()方法的每次 输入都去内存当中直接去匹配连接。 然后把连接结果按 key 输出,这种方法要使用 hadoop
中的 DistributedCache 把小份数据分布到各个计算节点,每个 maptask 执行任务的节点都需 要加载该数据到内存,并且按连接关键字建立索引 
(map读的是大表数据,在读大表之前,把小表数据放到内存当中,用setup方法)

    2、需求

现有两份数据 movies.dat 和 ratings.dat 数据样式分别为:

Movies.dat:
     1::Toy Story (1995)::Animation|Children's|Comedy
     2::Jumanji (1995)::Adventure|Children's|Fantasy
     3::Grumpier Old Men (1995)::Comedy|Romance
     字段含义: movieid, moviename, movietype

Ratings.dat
    1::1193::5::978300760
    1::661::3::978302109
    1::914::3::978301968

字段含义: userid, movieid, rate, timestamp 

现要求对两表进行连接,要求输出最终的结果有以上六个字段:
movieid, userid, rate, moviename, movietype, timestamp 

 

     3、实现

第一步:封装 MovieRate,方便数据的排序和序列化 
    

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package com.ghgj.mr.mymapjoin;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class MovieRate implements WritableComparable<MovieRate>{
private String movieid;
private String userid;
private int rate;
private String movieName;
private String movieType;
private long ts;
public String getMovieid() {
return movieid;
}
public void setMovieid(String movieid) {
this.movieid = movieid;
}
public String getUserid() {
return userid;
}
public void setUserid(String userid) {
this.userid = userid;
}
public int getRate() {
return rate;
}
public void setRate(int rate) {
this.rate = rate;
}
public String getMovieName() {
return movieName;
}
public void setMovieName(String movieName) {
this.movieName = movieName;
}
public String getMovieType() {
return movieType;
}
public void setMovieType(String movieType) {
this.movieType = movieType;
}
public long getTs() {
return ts;
}
public void setTs(long ts) {
this.ts = ts;
}
public MovieRate() {
}
public MovieRate(String movieid, String userid, int rate, String movieName,
String movieType, long ts) {
this.movieid = movieid;
this.userid = userid;
this.rate = rate;
this.movieName = movieName;
this.movieType = movieType;
this.ts = ts;
}
@Override
public String toString() {
return movieid + "\t" + userid + "\t" + rate + "\t" + movieName
"\t" + movieType + "\t" + ts;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(movieid);
out.writeUTF(userid);
out.writeInt(rate);
out.writeUTF(movieName);
out.writeUTF(movieType);
out.writeLong(ts);
}
@Override
public void readFields(DataInput in) throws IOException {
this.movieid = in.readUTF();
this.userid = in.readUTF();
this.rate = in.readInt();
this.movieName = in.readUTF();
this.movieType = in.readUTF();
this.ts = in.readLong();
}
@Override
public int compareTo(MovieRate mr) {
int it = mr.getMovieid().compareTo(this.movieid);
if(it == 0){
return mr.getUserid().compareTo(this.userid);
}else{
return it;
}
}
}

第二步:编写mapreduce程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.ghgj.mr.mymapjoin;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MovieRatingMapJoinMR {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS""hdfs://hadoop02:9000");
System.setProperty("HADOOP_USER_NAME","hadoop");

Job job = Job.getInstance(conf);
// job.setJarByClass(MovieRatingMapJoinMR.class);
job.setJar("/home/hadoop/mrmr.jar");
job.setMapperClass(MovieRatingMapJoinMRMapper.class);
job.setMapOutputKeyClass(MovieRate.class);
job.setMapOutputValueClass(NullWritable.class);
// job.setReducerClass(MovieRatingMapJoinMReducer.class);
// job.setOutputKeyClass(MovieRate.class);
// job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
String minInput = args[0];
String maxInput = args[1];
String output = args[2];
FileInputFormat.setInputPaths(job, new Path(maxInput));
Path outputPath = new Path(output);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outputPath)){
fs.delete(outputPath, true);
}
FileOutputFormat.setOutputPath(job, outputPath);
URI uri = new Path(minInput).toUri();
job.addCacheFile(uri);
boolean status = job.waitForCompletion(true);
System.exit(status?0:1);
}

  MapReduce(四) 典型编程场景(二)

MapReduce(四) 典型编程场景(二)

二、自定义 OutputFormat—数据分类输出 

实现:自定义 OutputFormat,改写其中的 RecordWriter,改写具体输出数据的方法 write() 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.ghgj.mr.score_outputformat;
 
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 
public class MyScoreOutputFormat extends TextOutputFormat<Text, NullWritable>{
 
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(
            TaskAttemptContext job) throws IOException, InterruptedException {
        Configuration configuration = job.getConfiguration();
         
        FileSystem fs = FileSystem.get(configuration);
        Path p1 = new Path("/score1/outpu1");
        Path p2 = new Path("/score2/outpu2");
         
        if(fs.exists(p1)){
            fs.delete(p1, true);
        }
        if(fs.exists(p2)){
            fs.delete(p2, true);
        }
         
        FSDataOutputStream fsdout1 = fs.create(p1);
        FSDataOutputStream fsdout2 = fs.create(p2);
        return new MyRecordWriter(fsdout1, fsdout2);
    }
     
    static class MyRecordWriter extends RecordWriter<Text, NullWritable>{
 
        FSDataOutputStream dout1 = null;
        FSDataOutputStream dout2 = null;
         
        public MyRecordWriter(FSDataOutputStream dout1, FSDataOutputStream dout2) {
            super();
            this.dout1 = dout1;
            this.dout2 = dout2;
        }
 
        @Override
        public void write(Text key, NullWritable value) throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
             
            String[] strs = key.toString().split("::");
            if(strs[0].equals("1")){
                dout1.writeBytes(strs[1]+"\n");
            }else{
                dout2.writeBytes(strs[1]+"\n");
            }
        }
 
        @Override
        public void close(TaskAttemptContext context) throws IOException,
                InterruptedException {
            IOUtils.closeStream(dout2);
            IOUtils.closeStream(dout1);
        }
    }
}

  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.ghgj.mr.score_outputformat;
 
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
public class ScoreOutputFormatMR extends Configured implements Tool{
 
    // 这个run方法就相当于Driver
    @Override
    public int run(String[] args) throws Exception {
         
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS""hdfs://hadoop02:9000");
        System.setProperty("HADOOP_USER_NAME""hadoop");
        Job job = Job.getInstance(conf);
         
        job.setMapperClass(ScoreOutputFormatMRMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
         
        job.setNumReduceTasks(0);
         
        // 这就是默认的输入输出组件
        job.setInputFormatClass(TextInputFormat.class);
        // 这是默认往外输出数据的组件
//      job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputFormatClass(MyScoreOutputFormat.class);
         
        FileInputFormat.setInputPaths(job, new Path("/scorefmt"));
        Path output = new Path("/scorefmt/output");
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(output)){
            fs.delete(output, true);
        }
        FileOutputFormat.setOutputPath(job, output);
         
        boolean status = job.waitForCompletion(true);
        return status?0:1;
    }
 
    public static void main(String[] args) throws Exception {
         
        int run = new ToolRunner().run(new ScoreOutputFormatMR(), args);
        System.exit(run);
    }
     
    static class ScoreOutputFormatMRMapper extends Mapper<LongWritable,  Text, Text, NullWritable>{
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, NullWritable>.Context context)
                throws IOException, InterruptedException {
             
            String[] split = value.toString().split("\t");
            if(split.length-2 >= 6){
                context.write(new Text("1::"+value.toString()), NullWritable.get());
            }else{
                context.write(new Text("2::"+value.toString()), NullWritable.get());
            }
        }
    }
}

三、自定义 InputFormat—小文件合并 

      第一步:自定义InputFormat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.ghgj.mr.format.input;
 
import java.io.IOException;
 
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
public class WholeFileInputFormat extends FileInputFormat<NullWritable, Text> {
    // 设置每个小文件不可分片,保证一个小文件生成一个key-value键值对
    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }
 
    @Override
    public RecordReader<NullWritable, Text> createRecordReader(
            InputSplit split, TaskAttemptContext context) throws IOException,
            InterruptedException {
        WholeFileRecordReader reader = new WholeFileRecordReader();
        reader.initialize(split, context);
        return reader;
    }
}

  第二步:编写自定义的 RecordReader 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package com.ghgj.mr.format.input;
 
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
class WholeFileRecordReader extends RecordReader<NullWritable, Text> {
    private FileSplit fileSplit;
    private Configuration conf;
    private Text value = new Text();
    private boolean processed = false;
 
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        this.fileSplit = (FileSplit) split;
        this.conf = context.getConfiguration();
    }
 
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!processed) {
            // 获取 输入逻辑切片的 字节数组
            byte[] contents = new byte[(int) fileSplit.getLength()];
            // 通过 filesplit获取该逻辑切片在文件系统的位置
            Path file = fileSplit.getPath();
            FileSystem fs = file.getFileSystem(conf);
            FSDataInputStream in = null;
            try {
                // 文件系统对象fs打开一个file的输入流
                in = fs.open(file);
                /**
                 *  in是输入流
                 *  contents是存这个流读取的到数的数据的字节数组
                 
                 */
                IOUtils.readFully(in, contents, 0, contents.length);
                 
                value.set(contents, 0, contents.length);
                 
            finally {
                IOUtils.closeStream(in);
            }
            processed = true;
            return true;
        }
        return false;
    }
 
    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }
 
    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }
 
    @Override
    public float getProgress() throws IOException {
        return processed ? 1.0f : 0.0f;
    }
 
    @Override
    public void close() throws IOException {
        // do nothing
    }
}

   第三步:编写mapreduce程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package com.ghgj.mr.format.input;
 
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
public class SmallFilesConvertToBigMR extends Configured implements Tool {
     
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new SmallFilesConvertToBigMR(), args);
        System.exit(exitCode);
    }
 
    static class SmallFilesConvertToBigMRMapper extends
            Mapper<NullWritable, Text, Text, Text> {
         
        private Text filenameKey;
        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            InputSplit split = context.getInputSplit();
            Path path = ((FileSplit) split).getPath();
            filenameKey = new Text(path.toString());
        }
 
        @Override
        protected void map(NullWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            context.write(filenameKey, value);
        }
    }
 
    static class SmallFilesConvertToBigMRReducer extends
            Reducer<Text, Text, NullWritable, Text> {
        @Override
        protected void reduce(Text filename, Iterable<Text> bytes,
                Context context) throws IOException, InterruptedException {
            context.write(NullWritable.get(), bytes.iterator().next());
        }
    }
 
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS""hdfs://hadoop02:9000");
        System.setProperty("HADOOP_USER_NAME""hadoop");
        Job job = Job.getInstance(conf, "combine small files to bigfile");
         
        job.setJarByClass(SmallFilesConvertToBigMR.class);
         
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setMapperClass(SmallFilesConvertToBigMRMapper.class);
 
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
        job.setReducerClass(SmallFilesConvertToBigMRReducer.class);
 
        // TextInputFormat是默认的数据读取组件
//      job.setInputFormatClass(TextInputFormat.class);
        // 不是用默认的读取数据的Format,我使用自定义的 WholeFileInputFormat
        job.setInputFormatClass(WholeFileInputFormat.class);
         
         
        Path input = new Path("/smallfiles");
        Path output = new Path("/bigfile");
        FileInputFormat.setInputPaths(job, input);
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(output)) {
            fs.delete(output, true);
        }
        FileOutputFormat.setOutputPath(job, output);
 
        int status = job.waitForCompletion(true) ? 0 1;
        return status;
    }
}

相关文章:

  • 2022-02-03
  • 2021-09-19
  • 2022-01-01
  • 2021-12-09
  • 2022-12-23
  • 2021-06-14
  • 2022-12-23
  • 2021-11-22
猜你喜欢
  • 2021-10-27
  • 2022-01-08
  • 2022-12-23
  • 2021-03-31
  • 2021-06-02
相关资源
相似解决方案