需求描述:
从hdfs中获取数据,字段url需要计算出url_type 通过进行hive的left outer join ,效率非常低。故将url的类型导入到hbase中,利用hbase快速查询的特点,结合mapreduce进行字段打标。
刚开始的mapreduce程序如下:
1 package com.bonc.db; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.hbase.client.Get; 8 import org.apache.hadoop.hbase.client.HTable; 9 import org.apache.hadoop.hbase.client.HTablePool; 10 import org.apache.hadoop.hbase.client.Result; 11 import org.apache.hadoop.io.LongWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.Mapper; 15 import org.apache.hadoop.mapreduce.Reducer; 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 18 19 import com.bonc.URLMatch.HBaseMain; 20 21 public class DWA_S_D_USE_MB_COUNT_BASE2 { 22 public static void main(String args[]) throws Exception { 23 Configuration conf = new Configuration(); 24 Job job = new Job(conf, "DWA_S_D_USE_MB_COUNT_BASE"); 25 job.setJarByClass(DWA_S_D_USE_MB_COUNT_BASE2.class); 26 job.setMapperClass(DataCleanMapper.class); 27 job.setReducerClass(DataCleanReduce.class); 28 job.setNumReduceTasks(150); 29 job.setOutputKeyClass(Text.class); 30 job.setOutputValueClass(Text.class); 31 job.setMapOutputKeyClass(Text.class); 32 job.setMapOutputValueClass(Text.class); 33 FileInputFormat.addInputPath(job, new Path(args[0])); 34 FileOutputFormat.setOutputPath(job, new Path(args[1])); 35 System.exit(job.waitForCompletion(true) ? 0 : 1); 36 } 37 38 public static class DataCleanMapper extends 39 Mapper<LongWritable, Text, Text, Text> { 40 @Override 41 protected void map(LongWritable key, Text value, Context context) 42 throws IOException, InterruptedException { 43 String lines = value.toString(); 44 String[] strs = lines.split("\\|"); 45 ParesURL pu = new ParesURL(); 46 String url = "NULL"; 47 if (strs.length > 25) { 48 url = pu.execute(strs[25], "HOST"); 49 } 50 String keys = ""; 51 String values = ""; 52 if (strs.length > 16) { 53 keys = strs[0] + "|" + strs[1] + "|" + strs[2] + "|" + strs[3] 54 + "|" + strs[4] + "|" + use_seg(strs[5]) + "|" 55 + strs[11] + "|" + strs[16] + "|" + url + "|" + strs[7] 56 + "|" + strs[8] + "|" + strs[9] + "|" + strs[10] + "|"; 57 } 58 if (strs.length > 15) { 59 values = url + "|" + strs[13] + "|" + strs[15] + "|" + "1"; 60 } 61 context.write(new Text(keys), new Text(values)); 62 } 63 64 public String use_seg(String start_date) { 65 String s = "**"; 66 if (start_date.toString().length() > 23) { 67 if (isNum(start_date.toString().substring(11, 13)) 68 && Integer.parseInt(start_date.toString().substring(11, 69 13)) >= 0 70 && Integer.parseInt(start_date.toString().substring(11, 71 13)) <= 23) { 72 s = start_date.toString().substring(11, 13); 73 } 74 } 75 return s; 76 } 77 78 public static boolean isNum(String str) { 79 return str 80 .matches("^[-+]?(([0-9]+)([.]([0-9]+))?|([.]([0-9]+))?)$"); 81 } 82 } 83 84 public static class DataCleanReduce extends Reducer<Text, Text, Text, Text> { 85 private HTable table; 86 87 @Override 88 protected void reduce(Text arg0, Iterable<Text> arg1, Context context) 89 throws IOException, InterruptedException { 90 String keys = arg0.toString(); 91 String value[] = { "" }; 92 String url = "NULL"; 93 String visitIP = "NULL"; 94 String value2 = "NULL"; 95 for (Text c : arg1) { 96 value = c.toString().split("\\|"); 97 if (value.length > 0) { 98 url = value[0]; 99 } 100 if (value.length > 1) { 101 visitIP = value[1]; 102 } 103 if (value.length > 2) { 104 value2 = value[2]; 105 } 106 } 107 String matchResult = urlMatch(url); 108 if (matchResult.equals("NULL")) { 109 matchResult = urlMatch(visitIP); 110 } 111 String output = matchResult + "|" + value2 + "|" + "1"; 112 // System.out.println(output+"+++++++++++++++++"); 113 context.write(new Text(keys), new Text(output)); 114 } 115 116 @Override 117 protected void cleanup(Context context) throws IOException, 118 InterruptedException { 119 super.cleanup(context); 120 table.close(); 121 } 122 123 @Override 124 protected void setup(Context context) throws IOException, 125 InterruptedException { 126 // TODO Auto-generated method stub 127 super.setup(context); 128 HTablePool pool = new HTablePool(HBaseMain.conf, 1000); 129 table = (HTable) pool.getTable("22222"); 130 } 131 132 public String urlMatch(String url) { 133 String s = "NULL"; 134 if (url == null || url.equals("NULL")) { 135 s = "NULL"; 136 } else { 137 try { 138 Get getu = new Get(url.getBytes()); 139 Result ru = table.get(getu); 140 if (!ru.isEmpty()) { 141 s = new String(ru.getValue("123".getBytes(), "456".getBytes())); 142 } 143 } catch (IOException e) { 144 e.printStackTrace(); 145 } 146 } 147 return s; 148 } 149 } 150 }