【问题标题】:MapReduce count and find averageMapReduce 计数并找到平均值
【发布时间】:2017-04-09 10:14:30
【问题描述】:

我想在 MapReduce 中开发一个程序,从 .tbl 文件中获取 cust_key 和 balance 值。我已将 2 个值连接成字符串,然后将其发送到 Reducer,因此我将计算 cust_key 并找到平均余额每个段。这就是为什么我将段添加为键。

我想拆分字符串并分离 2 个值,以便计算 cust 键并对余额求和以找到平均值。但是拆分数组 [0] 给了我整个字符串,而不是字符串的第一个值.同样拆分数组[1]会抛出ArrayoutofBounds异常。希望清楚。

代码如下

public class MapReduceTest {

        public static class TokenizerMapper extends Mapper<Object, Text, Text, Text>{

         private Text segment = new Text();

         private Text word = new Text();

         private float balance = 0;


         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
           String[] line = value.toString().split("\\|");

           balance = Float.parseFloat(line[5]);

           String cust_key = line[1];

           int nation = Integer.parseInt(line[3]);

           if((balance > 8000) && ( nation < 15) && (nation > 1)){ 

             segment.set(line[6]);

             //word.set(cust_key+","+balance);

             word.set(cust_key+","+balance);

             context.write(segment,word);
           }
         }

       }

    public static class AvgReducer extends Reducer<Text,Text,Text,Text> {


         Text val = new Text();

    public void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {

         String cust_key = "";
         float avg,sum = 0;
         int count = 0;
            for(Text v : values){
                 String[] a = v.toString().trim().split(",");

                 cust_key +=a[0];

            }

            val.set(cust_count);

            context.write(key, val);

     }

   }

输入数据

8794|Customer#000008794|6dnUgJZGX73Kx1idr6|18|28-434-484-9934|7779.30|HOUSEHOLD|deposits detect furiously even requests. furiously ironic packages are slyly into th
8795|Customer#000008795|oA1cLUtWOAIFz5Douypbq1jHv glSE|9|19-829-732-8102|9794.80|BUILDING|totes. blithely unusual theodolites integrate carefully ironic foxes. unusual excuses cajole carefully carefully fi
8796|Customer#000008796|CzCzpV7SDojXUzi4165j,xYJuRv wZzn grYsyZ|24|34-307-411-6825|4323.03|AUTOMOBILE|s. pending, bold accounts above the sometimes express accounts 
8797|Customer#000008797|TOWDryHNNqp8bvgMW6 FAhRoLyG1ldu2bHcJCM6|2|12-517-522-5820|219.78|FURNITURE|ly bold pinto beans can nod blithely quickly regular requests. fluffily even deposits ru
8798|Customer#000008798|bIegyozQ5kzprN|15|25-472-647-6270|6832.96|AUTOMOBILE|es-- silent instructions nag blithely

堆栈跟踪

java.lang.Exception: java.lang.ArrayIndexOutOfBoundsException: 1
        at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
        at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
        at MapReduceTest$AvgReducer.reduce(MapReduceTest.java:69)
        at MapReduceTest$AvgReducer.reduce(MapReduceTest.java:1)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
        at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
        at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
17/04/12 18:40:33 INFO mapreduce.Job: Job job_local806960399_0001 running in uber mode : false
17/04/12 18:40:33 INFO mapreduce.Job:  map 100% reduce 0%
17/04/12 18:40:33 INFO mapreduce.Job: Job job_local806960399_0001 failed with state FAILED due to: NA
17/04/12 18:40:33 INFO mapreduce.Job: Counters: 35

更新

减速器

    public static class AvgReducer extends Reducer<Text,Text,Text,Text> {

    Logger log = Logger.getLogger(AvgReducer.class.getName());

    public void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {

            float sumBalance=0,avgBalance = 0;

            int cust_count = 1;

            for(Text v : values){
               String[] a = v.toString().trim().split(",");

               //c2 += " i "+i+" "+a[0]+"\n";

               sumBalance +=Float.parseFloat(a[a.length-1]);

               cust_count++;
            }

            avgBalance = sumBalance / cust_count;


            context.write(key,new Text(avgBalance+" "+cust_count));

     }

   }

堆栈跟踪

java.lang.Exception: java.lang.NumberFormatException: For input string: "8991.715 289"

提前致谢。

【问题讨论】:

  • 您可以尝试打印出value 吗?它可能没有足够的 | 来拆分为 7 元素数组。
  • 这个问题太宽泛了。 minimal reproducible example 在哪里。如果你在这上面花了几个小时,你能花几分钟来添加更多信息吗?有什么问题,什么输入/输出...当然,不要用短网址发任何东西,我不会点击那些链接,主要是因为这里的防火墙会阻止那些...
  • 能否提供样例输入数据。
  • 您是否考虑过为此使用 Hive、Pig 或 Spark?
  • 我不知道我应该这样做吗?

标签: java hadoop mapreduce mapper


【解决方案1】:

Pig 运行 MapReduce(如果以这种方式配置)。它也比使用 MapReduce 更干净,并且安装在主要的 Hadoop 发行版上。

A = LOAD 'test.txt' USING PigStorage('|') AS (f1:int,customer_key:chararray,f3:chararray,nation:int,f5:chararray,balance:float,segment:chararray,f7:chararray);
filtered = FILTER A BY balance > 8000 AND (nation > 1 AND nation < 15);
X = FOREACH filtered generate segment,customer_key,balance;

然后输出

\d X
(BUILDING,Customer#000008795,9794.8)

不确定您是否真的想要这里的平均值,只有一个元素,但您需要在segmentcustomer_key 上输入GROUP BY,然后您可以轻松使用AVG function


如果您更熟悉 SQL,那么 Hive 也可能是一种更直接的方法。

(也通过 MapReduce 运行,除非另有配置)

CREATE EXTERNAL TABLE IF NOT EXISTS records (
    f1 INT,
    customer_key STRING, 
    f3 STRING, 
    nation INT,
    f5 STRING,
    balance FLOAT,
    f8 STRING
) ROW FORMAT DELIMETED 
FIELDS TERMINATED BY '|'
LOCATION 'hdfs://path/test.txt';

那么,应该是这样的

SELECT segment, customer_key, AVG(balance)
FROM records
WHERE balance > 8000 AND nation > 1 AND nation < 15
GROUP BY segment, customer_key;

我将进入 Apache Spark 示例,但 Spark SQL 本质上就是 Hive 查询。

【讨论】:

  • 感谢所有帮助,但我应该只使用 MapReduce 来实现它。
  • 抱歉,我不明白您为什么要编写 50 多行代码,而您可以在 10 或更少的时间内完成。
  • 这是我项目的一个小程序。教授想在MapReduce上开发它以更好地学习:)
  • 好吧,我会在将文本发送到减速器之前检查空值。
  • 或者如果根本没有输入......如果你得到空值,那么你会看到NullPointerException,而不是越界。另外,你没有提到你是如何运行这项工作的,但我假设hadoop jar
【解决方案2】:

如果您真的想在 Java MapReduce 中尝试此操作,请尝试标准化您的输入并快速捕获错误。

返回丢弃有问题的记录

     public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
       float balance = 0.0;
       String custKey = "";
       String segment = "";
       int nation = 0;

       String[] line = value.toString().split("\\|");
       if (line.length < 7) { 
           System.err.println("map: Not enough records");
           return;
       }
       cust_key = line[1];
       try {
           nation = Integer.parseInt(line[3]);
           balance = Float.parseFloat(line[5]);
       } catch (NumberFormatException e) {
           e.printStackTrace();
           return;
       }

       if(balance > 8000 && (nation < 15 && nation > 1)){ 
         segment.set(line[6]);
         word.set(cust_key + "\t" + balance);
         context.write(segment,word);
       }
  }

然后,如果您尝试编写相似的输出记录,reducer 应该理想地生成相同的格式

public void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {

        float sumBalance=0
        int count = 0;

        for(Text v : values){
           String[] a = v.toString().trim().split("\t");
           if (a.length < 2) {
               System.err.println("reduce: Not enough records");
               continue;
           }

           sumBalance += Float.parseFloat(a[1]);
           count++;
        }

        float avgBalance = count <= 1 ? sumBalance : sumBalance / count;

        context.write(key,new Text(avgBalance + "\t" + count));

 }

(未经测试的代码)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-06-11
    • 2014-07-19
    • 1970-01-01
    • 1970-01-01
    • 2015-10-12
    • 1970-01-01
    相关资源
    最近更新 更多