【问题标题】:Python - implementing join in MapReduce - problems with reducer outputPython - 在 MapReduce 中实现连接 - 减速器输出问题
【发布时间】:2013-05-30 05:34:52
【问题描述】:

这是我在 Coursera 上做的数据科学课程中的硬件任务的求助电话,因为我无法在 Coursera 论坛上获得任何建议。我已经编写了代码,但不幸的是输出没有返回预期的结果。这是手头的问题:

任务:将关系连接实现为 MapReduce 查询

输入(映射器):

输入将是格式化为字符串列表的数据库记录。 每个列表元素对应于其对应记录中的不同字段。 每条记录中的第一项(索引 0)是一个字符串,用于标识该记录来自哪个表。该字段有两个可能的值:

  1. ‘line_item’表示该记录是一个行项目。 2.‘order’表示该记录是一个订单。

每条记录中的第二个元素(索引 1)是 order_id。 LineItem 记录有 17 个元素,包括标识符字符串。 订单记录有 10 个元素,包括标识符字符串。

输出(减速器):

输出应该是一个连接的记录。

结果应该是长度为 27 的单个列表,其中包含来自订单记录的字段,后跟来自行项目记录的字段。每个列表元素都应该是一个字符串。

我的代码是:

import MapReduce
import sys

"""
Word Count Example in the Simple Python MapReduce Framework
"""

mr = MapReduce.MapReduce()

# =============================
# Do not modify above this line

record = open(sys.argv[1]) # this read input, given by instructor

def mapper(record):
key = record[1] # assign order_id from each record as key
value = list(record) # assign whole record as value for each key
mr.emit_intermediate(key, value) # emit key-value pairs

def reducer(key, value):
    new_dict = {} # create dict to keep track of records
    if not key in new_dict:
        new_dict[key] = value
    else:
        new_dict[key].extend(value)
    for key in new_dict:
        if len(new_dict[key]) == 27:
            mr.emit(new_dict[key])

# Do not modify below this line
# =============================
if __name__ == '__main__':
  inputdata = open(sys.argv[1])
  mr.execute(inputdata, mapper, reducer)

我收到的错误消息是“预期:31 条记录,得到 0”。

此外,预期的输出记录应该是这样的——只有一个列表,所有记录集中在一起,没有任何重复数据删除。

["order", "5", "44485", "F", "144659.20", "1994-07-30", "5-LOW", "Clerk#000000925", "0", "quickly. bold deposits sleep slyly. packages use slyly", "line_item", "5", "37531", "35", "3", "50", "73426.50", "0.08", "0.03", "A", "F", "1994-08-08", "1994-10-13", "1994-08-26", "DELIVER IN PERSON", "AIR", "eodolites. fluffily unusual"]

抱歉,问题太长了,有点乱,但我希望答案对某人来说是显而易见的。

对我有用的类似代码:

def mapper(record):
    # key: document identifier
    # value: document contents
    friend = record[0]
    value = 1
    mydict = {}
    mr.emit_intermediate(friend, value)
    mydict[friend] = int(value)


def reducer(friend, value):
    # key: word
    # value: list of occurrence counts
    newdict = {}
    if not friend in newdict:
        newdict[friend] = value
    else:
    newdict[friend] = newdict[friend] + 1
    for friend in newdict:
    mr.emit((friend, (newdict[friend])))

谢谢! 谢尔盖

【问题讨论】:

  • 代码 sn-p 看起来不完整。它以定义record 的缩进行开头,并且有对mr 的引用,但是之前没有提到该项目(模块?类实例?)。也没有迹象表明您如何使用此代码,尽管第一行让我认为您是从带有参数的命令行调用它。如果是这种情况,那么如何调用函数?如果您可以分享有关此的更多详细信息以及输入数据的示例,这将有助于澄清情况。
  • 谢谢贾斯汀!我在开头和结尾添加了缺失的部分代码。它们只是进口产品,但这些不太可能导致问题的部分,因为它对我来说在其他问题中起作用。最好的!
  • 哪一行产生了错误信息?您可以发布完整的回溯吗?我有一种预感,我知道出了什么问题,但我觉得回溯仍然会有所帮助。
  • @JustinSBarrett 不幸的是,这就是我所掌握的关于错误的所有信息,因为我正在使用 Course VM,并且只有在使用我的文档上传 Python 文件后才能得到响应。但从响应来看,我认为问题出在reduce函数的某个地方,它没有返回正确的输出。谢谢!

标签: python mapreduce jointable reducers


【解决方案1】:

其实你不必使用new_dict。由于您必须打印“加入”并且您知道 orders 在您的值列表中始终位于索引 0 中,并且列表的其余部分是 line_item 此代码应该这样做:

import MapReduce
import sys

"""
Word Count Example in the Simple Python MapReduce Framework
"""

mr = MapReduce.MapReduce()

# =============================
# Do not modify above this line

def mapper(record):
    key = record[1] # assign order_id from each record as key
    value = list(record) # assign whole record as value for each key
    mr.emit_intermediate(key, value) # emit key-value pairs

def reducer(key, value):
    for index in range (1, len(value)):
        mr.emit(value[0] + value[index])

# Do not modify below this line
# =============================
if __name__ == '__main__':
  inputdata = open(sys.argv[1])
  mr.execute(inputdata, mapper, reducer)

【讨论】:

    【解决方案2】:

    我发现这段代码有一些问题。首先是这一行:

    record = open(sys.argv[1])
    

    我觉得奇怪的是,这个 record 变量从未在代码的其他任何地方使用过。即使mapper函数定义如下:

    def mapper(record):
        ...
    

    ...那个 recordmapper 函数的本地函数。它与第一个 record 的范围不同。传递给mapper 的任何数据都将分配给其本地record 并相应地使用,并且永远不会触及分配给第一个record 的文件对象。不过,我不认为这与错误有关。因为第一个 record 没有在其他任何地方使用,所以您可以非常安全地删除该行。

    然后是reducer函数:

    def reducer(key, value): # reducer should take 2 inputs according to the task
        if key in new_dict: # checking if key already added to dict
            new_dict[key].extend(list(value)) # if yes just append all records to the value
        new_dict[key] = list(value) # if not create new key and assign record to value
        for key in new_dict:
            if len(new_dict[key]) == 27: # checks to emit only records found in both tables
                mr.emit(new_dict[key])
    

    您自己的 cmets 在这里提供了问题的线索。首先,您说您正在检查密钥是否已经在字典中。如果是这样,只需将所有记录附加到该值。如果没有,则创建一个新键并将记录分配给该值。

    问题在于与“如果不是”注释关联的行。如果第一个if 测试失败时确实应该这样做,那么它应该以else 行开头:

        ...
        if key in new_dict: # checking if key already added to dict
            new_dict[key].extend(list(value)) # if yes just append all records to the value
        else:
            new_dict[key] = list(value) # if not create new key and assign record to value
        ...
    

    按照您编写它的方式,即使 if 测试成功并将数据附加到键的现有值,它也会立即踩下该更改。换句话说,该键的值不会增长。它总是代表最近提交的键值。

    这是完整的代码,包含所有建议的更改:

    import MapReduce
    import sys
    
    """
    Word Count Example in the Simple Python MapReduce Framework
    """
    
    mr = MapReduce.MapReduce()
    
    # =============================
    # Do not modify above this line
    
    def mapper(record):
        key = record[1] # assign order_id from each record as key
        value = list(record) # assign whole record as value for each key
        mr.emit_intermediate(key, value) # emit key-value pairs
    
    new_dict = {} # create dict to keep track of records
    
    def reducer(key, value):
        if not key in new_dict:
            new_dict[key] = value
        else:
            new_dict[key].extend(value)
        for key in new_dict:
            if len(new_dict[key]) == 27:
                mr.emit(new_dict[key])
    
    # Do not modify below this line
    # =============================
    if __name__ == '__main__':
      inputdata = open(sys.argv[1])
      mr.execute(inputdata, mapper, reducer)
    

    【讨论】:

    • 感谢您指出我缺少 else 语句的错误。我已经按照你的指示解决了这个问题。唉,我仍然遇到同样的错误。我认为这可能是因为我的条件(如果 len(new_dict[key]) == 27),所以我删除了它,但程序仍然返回 0 值。为了比较,我还发布了类似任务的代码,这对我来说效果很好,请参阅下面的下一条评论。
    • 糟糕,我实际上无法将代码作为评论发布,因此我在最初的问题描述末尾添加了该代码。再次感谢贾斯汀!
    • 我刚刚编辑了答案以添加有关新代码的响应。也许这样可以解决问题?
    • 好吧,在这个任务中我只发出值,没有键,因为这是问题中要求的:“结果应该是一个长度为 27 的列表,其中包含来自订单记录的字段后跟来自行项目记录的字段。每个列表元素都应该是一个字符串。这是我上面的任务定义中的引用。这里的问题似乎是,截至目前,我的减速器功能生产者 0 输出,预计会在哪里产生一些..贾斯汀,感谢您与我一起研究这个问题。如果您有更多想法,请告诉我。至少我正在学习中:)
    • 我刚刚注意到您将new_dict 的初始化移到了 reducer 函数中。这肯定会导致事情失败,因为每次调用该函数时都会从一个空字典重新开始。
    【解决方案3】:

    不知道它是否仍然相关,只是想指出,正如 Justin 所建议的,每次调用 reducer 时都会重置 new_dict。一种方法是发出 2 个键值对。 说你有 2 行 -

    1. order_id, line_item_id,Order_name

      1    ,      2      ,   'abc'
      
    2. line_item_id,line_item_location

      2        ,     'xyz'
      

    我们希望输出为 -

     1,2,'abc','xyz'
    

    从映射器发出 2 个键值对,以连接列作为公共键 -

    (2,[1,'abc']) and (2,['xyz'])
    

    在 reducer 中,输入将是 -

    (2,[[1,'abc'],['xyz']])
    

    从那里开始尝试操作数据以获得所需的输出。

    (供参考 - 我正在为我的 M/R 工作使用 python dumbo 框架)

    【讨论】:

      【解决方案4】:
      import MapReduce
      import sys
      
      mr = MapReduce.MapReduce()
      new_dict = {}
      
      def mapper(record):
         order_id = record[1]
          mr.emit_intermediate(order_id, list(record))
      
      
      def reducer(key, list_of_lists):
          for lyst in list_of_lists:
          order_id = lyst[1]
          if not (order_id in new_dict):
          new_dict[order_id] = lyst
          else:
          new_dict[order_id].extend(lyst)
          mr.emit(new_dict[order_id])
      
      if __name__ == '__main__':
          inputdata = open(sys.argv[1])
          #inputdata = open("records.json")  
          mr.execute(inputdata, mapper, reducer)
      

      【讨论】:

      • 在您的答案中添加代码解释。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-02-15
      • 2011-07-25
      • 2020-11-24
      • 2014-05-21
      • 2016-07-02
      • 2016-08-13
      • 2012-10-25
      相关资源
      最近更新 更多