【问题标题】:What determines the number of reducers and how to avoid bottlenecks regarding reducers?什么决定了减速器的数量以及如何避免减速器的瓶颈?
【发布时间】:2013-10-25 01:20:01
【问题描述】:

假设我有一个包含此类信息的大 tsv 文件:

2012-09-22 00:00:01.0   249342258346881024  47268866    0   0   0   bo
2012-09-22 00:00:02.0   249342260934746115  1344951     0   0   4   ot
2012-09-22 00:00:02.0   249342261098336257  346095334   1   0   0   ot
2012-09-22 00:05:02.0   249342261500977152  254785340   0   1   0   ot

我想实现一个 MapReduce 作业,它枚举五分钟的时间间隔并过滤 tsv 输入的一些信息。输出文件如下所示:

0 47268866  bo
0 134495    ot
0 346095334 ot
1 254785340 ot

key是区间的编号,例如0是2012-09-22 00:00:00.02012-09-22 00:04:59之间区间的引用。

我不知道这个问题是否不适合 MapReduce 方法,或者我是否认为它不正确。在 map 函数中,我只是将时间戳作为键传递,将过滤后的信息作为值传递。在 reduce 函数中,我通过使用全局变量来计算间隔并产生提到的输出。

i. 框架是自动确定reducer 的数量还是由用户定义?使用一个 reducer,我认为我的方法没有问题,但我想知道在处理非常大的文件时,一个 reduce 是否会成为瓶颈,可以吗?

ii.如何使用多个减速器解决这个问题?

任何建议将不胜感激! 提前致谢!

编辑:

@Olaf 回答了第一个问题,但第二个问题仍然让我对并行性有一些疑问。我的地图函数的地图输出目前是这样的(我只是以分钟精度传递时间戳):

2012-09-22 00:00   47268866    bo
2012-09-22 00:00   344951      ot
2012-09-22 00:00   346095334   ot
2012-09-22 00:05   254785340   ot

因此,在 reduce 函数中,我收到输入,键表示收集信息的时间,值表示信息本身,我想枚举从 0 开始的五分钟间隔。我目前正在使用全局变量来存储间隔的开始,当一个键推断它时,我正在递增间隔计数器(这也是一个全局变量)。

代码如下:

private long stepRange = TimeUnit.MINUTES.toMillis(5);
private long stepInitialMillis = 0;
private int stepCounter = 0;

@Override
public void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {

    long millis = Long.valueOf(key.toString());
    if (stepInitialMillis == 0) {
        stepInitialMillis = millis;
    } else {
        if (millis - stepInitialMillis > stepRange) {
            stepCounter = stepCounter + 1;
            stepInitialMillis = millis;
        }
    }
    for (Text value : values) {
        context.write(new Text(String.valueOf(stepCounter)),
                new Text(key.toString() + "\t" + value));
    }
}

因此,如果使用多个 reducer,我的 reduce 函数将在两个或多个节点上运行,在两个或多个 JVM 中,我将失去全局变量赋予的控制权,我没有考虑针对我的情况的解决方法.

【问题讨论】:

    标签: java hadoop mapreduce


    【解决方案1】:

    reducer 的数量取决于集群的配置,尽管您可以限制 MapReduce 作业使用的 reducer 的数量。

    如果您要处理大量数据,单个 reducer 确实会成为 MapReduce 作业的瓶颈。

    Hadoop MapReduce 引擎保证与同一个键关联的所有值都发送到同一个 reducer,因此您的方法应该适用于多个 reducer。见雅虎!教程详情:http://developer.yahoo.com/hadoop/tutorial/module4.html#listreducing

    编辑:为了保证同一时间间隔的所有值都进入同一个reducer,您必须使用时间间隔的一些唯一标识符作为键。您必须在映射器中执行此操作。我正在再次阅读您的问题,除非您想以某种方式聚合对应于相同时间间隔的记录之间的数据,否则您根本不需要任何减速器。

    编辑:正如@SeanOwen 指出的,reducer 的数量取决于集群的配置。通常配置为每个节点的最大任务数乘以数据节点数的 0.95 到 1.75 倍。如果集群配置中没有设置mapred.reduce.tasks值,则默认reducer个数为1。

    【讨论】:

    • 感谢您的快速回答!关于第二个问题,我编辑了我的帖子以更好地解释我的问题。
    • @JoãoMelo:我再次阅读了您的问题并编辑了我的答案。您想要的功能最好在映射器中实现。
    • 我同意你的观点,但在我的方法中,间隔计数器从文件的第一个时间戳开始,在这种情况下是第一个块的第一个时间戳。我想这是不可能的,因为每个节点都会使用不同的块和不同的全局变量运行一个实例,对吗?
    • @JoãoMelo:我相信您的评估是正确的。如果不读取整个文件集合,就没有找到最早时间戳的好方法。您是否可以使用某个绝对值作为时间间隔标识符,而不需要知道整个输入中最早的时间戳?
    • hadoop.apache.org/docs/stable/mapred-default.html 和几年的经验... mapred.reduce.tasks 默认为 1,我从未听说过 在 Hadoop 中为您设置了其他值。当然,代表您使用 Hadoop 的服务可能会为您进行调整。
    【解决方案2】:

    您似乎希望按 5 分钟的时间块汇总一些数据。使用 Hadoop 的 Map-reduce 非常适合这类事情!应该没有理由使用任何“全局变量”。以下是我的设置方式:

    映射器读取 TSV 的一行。它获取时间戳,并计算它属于哪个五分钟存储桶。将其制成一个字符串,并将其作为键发出,例如“20120922:0000”、“20120922:0005”、“20120922:0010”等. 至于与该键一起发出的值,只需保持简单,然后将整个制表符分隔的行作为另一个 Text 对象发送。

    既然 mapper 已经确定了数据需要如何组织,那么 reducer 的工作就是进行聚合。每个 reducer 都会获得一个密钥(一个五分钟的存储桶),以及适合该存储桶的所有行的列表。它可以遍历该列表,并从中提取所需的任何内容,并根据需要将输出写入上下文。

    至于映射器,让 hadoop 弄清楚那部分。将 reducer 的数量设置为集群中的节点数,作为起点。应该可以正常运行。

    希望这会有所帮助。

    【讨论】:

      猜你喜欢
      • 2019-02-23
      • 2019-01-16
      • 1970-01-01
      • 2017-08-21
      • 1970-01-01
      • 2013-11-09
      • 2019-04-24
      • 2014-09-20
      • 2013-04-30
      相关资源
      最近更新 更多