需求描述:

从hdfs中获取数据,字段url需要计算出url_type 通过进行hive的left outer join ,效率非常低。故将url的类型导入到hbase中,利用hbase快速查询的特点,结合mapreduce进行字段打标。

刚开始的mapreduce程序如下:

  1 package com.bonc.db;
  2 
  3 import java.io.IOException;
  4 
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.fs.Path;
  7 import org.apache.hadoop.hbase.client.Get;
  8 import org.apache.hadoop.hbase.client.HTable;
  9 import org.apache.hadoop.hbase.client.HTablePool;
 10 import org.apache.hadoop.hbase.client.Result;
 11 import org.apache.hadoop.io.LongWritable;
 12 import org.apache.hadoop.io.Text;
 13 import org.apache.hadoop.mapreduce.Job;
 14 import org.apache.hadoop.mapreduce.Mapper;
 15 import org.apache.hadoop.mapreduce.Reducer;
 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 18 
 19 import com.bonc.URLMatch.HBaseMain;
 20 
 21 public class DWA_S_D_USE_MB_COUNT_BASE2 {
 22     public static void main(String args[]) throws Exception {
 23         Configuration conf = new Configuration();
 24         Job job = new Job(conf, "DWA_S_D_USE_MB_COUNT_BASE");
 25         job.setJarByClass(DWA_S_D_USE_MB_COUNT_BASE2.class);
 26         job.setMapperClass(DataCleanMapper.class);
 27         job.setReducerClass(DataCleanReduce.class);
 28         job.setNumReduceTasks(150);
 29         job.setOutputKeyClass(Text.class);
 30         job.setOutputValueClass(Text.class);
 31         job.setMapOutputKeyClass(Text.class);
 32         job.setMapOutputValueClass(Text.class);
 33         FileInputFormat.addInputPath(job, new Path(args[0]));
 34         FileOutputFormat.setOutputPath(job, new Path(args[1]));
 35         System.exit(job.waitForCompletion(true) ? 0 : 1);
 36     }
 37 
 38     public static class DataCleanMapper extends
 39             Mapper<LongWritable, Text, Text, Text> {
 40         @Override
 41         protected void map(LongWritable key, Text value, Context context)
 42                 throws IOException, InterruptedException {
 43             String lines = value.toString();
 44             String[] strs = lines.split("\\|");
 45             ParesURL pu = new ParesURL();
 46             String url = "NULL";
 47             if (strs.length > 25) {
 48                 url = pu.execute(strs[25], "HOST");
 49             }
 50             String keys = "";
 51             String values = "";
 52             if (strs.length > 16) {
 53                 keys = strs[0] + "|" + strs[1] + "|" + strs[2] + "|" + strs[3]
 54                         + "|" + strs[4] + "|" + use_seg(strs[5]) + "|"
 55                         + strs[11] + "|" + strs[16] + "|" + url + "|" + strs[7]
 56                         + "|" + strs[8] + "|" + strs[9] + "|" + strs[10] + "|";
 57             }
 58             if (strs.length > 15) {
 59                 values = url + "|" + strs[13] + "|" + strs[15] + "|" + "1";
 60             }
 61             context.write(new Text(keys), new Text(values));
 62         }
 63 
 64         public String use_seg(String start_date) {
 65             String s = "**";
 66             if (start_date.toString().length() > 23) {
 67                 if (isNum(start_date.toString().substring(11, 13))
 68                         && Integer.parseInt(start_date.toString().substring(11,
 69                                 13)) >= 0
 70                         && Integer.parseInt(start_date.toString().substring(11,
 71                                 13)) <= 23) {
 72                     s = start_date.toString().substring(11, 13);
 73                 }
 74             }
 75             return s;
 76         }
 77 
 78         public static boolean isNum(String str) {
 79             return str
 80                     .matches("^[-+]?(([0-9]+)([.]([0-9]+))?|([.]([0-9]+))?)$");
 81         }
 82     }
 83 
 84     public static class DataCleanReduce extends Reducer<Text, Text, Text, Text> {
 85         private HTable table;
 86 
 87         @Override
 88         protected void reduce(Text arg0, Iterable<Text> arg1, Context context)
 89                 throws IOException, InterruptedException {
 90             String keys = arg0.toString();
 91             String value[] = { "" };
 92             String url = "NULL";
 93             String visitIP = "NULL";
 94             String value2 = "NULL";
 95             for (Text c : arg1) {
 96                 value = c.toString().split("\\|");
 97                 if (value.length > 0) {
 98                     url = value[0];
 99                 }
100                 if (value.length > 1) {
101                     visitIP = value[1];
102                 }
103                 if (value.length > 2) {
104                     value2 = value[2];
105                 }
106             }
107             String matchResult = urlMatch(url);
108             if (matchResult.equals("NULL")) {
109                 matchResult = urlMatch(visitIP);
110             }
111             String output = matchResult + "|" + value2 + "|" + "1";
112             // System.out.println(output+"+++++++++++++++++");
113             context.write(new Text(keys), new Text(output));
114         }
115 
116         @Override
117         protected void cleanup(Context context) throws IOException,
118                 InterruptedException {
119             super.cleanup(context);
120             table.close();
121         }
122 
123         @Override
124         protected void setup(Context context) throws IOException,
125                 InterruptedException {
126             // TODO Auto-generated method stub
127             super.setup(context);
128             HTablePool pool = new HTablePool(HBaseMain.conf, 1000);
129             table = (HTable) pool.getTable("22222");
130         }
131 
132         public String urlMatch(String url) {
133             String s = "NULL";
134             if (url == null || url.equals("NULL")) {
135                 s = "NULL";
136             } else {
137                 try {
138                     Get getu = new Get(url.getBytes());
139                     Result ru = table.get(getu);
140                     if (!ru.isEmpty()) {
141                         s = new String(ru.getValue("123".getBytes(), "456".getBytes()));
142                     }
143                 } catch (IOException e) {
144                     e.printStackTrace();
145                 }
146             }
147             return s;
148         }
149     }
150 }
View Code

相关文章:

  • 2022-12-23
  • 2022-02-26
  • 2021-07-03
  • 2021-10-19
  • 2021-08-17
  • 2021-08-24
猜你喜欢
  • 2022-02-05
  • 2021-11-17
  • 2021-12-20
  • 2022-12-23
  • 2021-05-14
  • 2021-05-11
相关资源
相似解决方案