【问题标题】:hadoop map reduce job with HDFS input and HBASE output具有 HDFS 输入和 HBASE 输出的 hadoop map reduce 作业
【发布时间】:2011-05-31 13:01:01
【问题描述】:

我是 hadoop 新手。 我有一个 MapReduce 作业,它应该从 Hdfs 获取输入并将 reducer 的输出写入 Hbase。我没有找到任何好的例子。

这是代码,运行此示例的错误是 Map 中的类型不匹配,预期 ImmutableBytesWritable 收到 IntWritable。

映射器类

public static class AddValueMapper extends Mapper < LongWritable,
 Text, ImmutableBytesWritable, IntWritable > {  

  /* input <key, line number : value, full line>
   *  output <key, log key : value >*/  
public void map(LongWritable key, Text value, 
     Context context)throws IOException, 
     InterruptedException {
  byte[] key;
  int value, pos = 0;
  String line = value.toString();
  String p1 , p2 = null;
  pos = line.indexOf("=");

   //Key part
   p1 = line.substring(0, pos);
   p1 = p1.trim();
   key = Bytes.toBytes(p1);   

   //Value part
   p2 = line.substring(pos +1);
   p2 = p2.trim();
   value = Integer.parseInt(p2);

   context.write(new ImmutableBytesWritable(key),new IntWritable(value));
  }
}

Reducer 类

public static class AddValuesReducer extends TableReducer<
  ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> {

  public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, 
   Context context) throws IOException, InterruptedException {

         long total =0;
         // Loop values
         while(values.iterator().hasNext()){
           total += values.iterator().next().get();
         }
         // Put to HBase
         Put put = new Put(key.get());
         put.add(Bytes.toBytes("data"), Bytes.toBytes("total"),
           Bytes.toBytes(total));
         Bytes.toInt(key.get()), total));
            context.write(key, put);
        }
    }

我只在 HDFS 上做过类似的工作,并且工作正常。

2013 年 6 月 18 日编辑。该学院项目于两年前顺利完成。对于作业配置(驱动程序部分),请检查正确答案。

【问题讨论】:

标签: java hadoop mapreduce hbase hdfs


【解决方案1】:

在 HBase 中批量加载数据的最佳和最快方法是使用 HFileOutputFormatCompliteBulkLoad 实用程序。

你会发现一个示例代码here:

希望这会有用:)

【讨论】:

  • 虽然此链接可能会回答问题,但最好在此处包含答案的基本部分并提供链接以供参考。如果链接页面发生更改,仅链接的答案可能会失效。
【解决方案2】:

这是解决问题的代码



驱动程序

HBaseConfiguration conf =  HBaseConfiguration.create();
Job job = new Job(conf,"JOB_NAME");
    job.setJarByClass(yourclass.class);
    job.setMapperClass(yourMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Intwritable.class);
    FileInputFormat.setInputPaths(job, new Path(inputPath));
    TableMapReduceUtil.initTableReducerJob(TABLE,
            yourReducer.class, job);
    job.setReducerClass(yourReducer.class);
            job.waitForCompletion(true);


映射器和减速器

class yourMapper extends Mapper<LongWritable, Text, Text,IntWritable> {
//@overide map()
 }

class yourReducer
        extends
        TableReducer<Text, IntWritable, 
        ImmutableBytesWritable>
{
//@override reduce()
}

【讨论】:

  • 感谢您的回答,您的工作配置与解决我的问题相同。当我有时间收拾这个烂摊子时,我会编辑和格式化我的问题。
【解决方案3】:
 public void map(LongWritable key, Text value, 
 Context context)throws IOException, 
 InterruptedException {

将其更改为immutableBytesWritableintwritable

我不确定..希望它有效

【讨论】:

  • 感谢您的回答,代码贴正确,项目两年前完成。
【解决方案4】:

不确定为什么 HDFS 版本有效:通常您必须为作业设置输入格式,而 FileInputFormat 是一个抽象类。也许你遗漏了一些台词?比如

job.setInputFormatClass(TextInputFormat.class);

【讨论】:

  • 感谢您的回答,驱动部分有几个错误,现已解决。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-04-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多