【问题标题】:What does flatmap in apache beam exactly do to the input pcollection when returning to the caller?返回调用者时,apache Beam 中的 flatmap 对输入 pcollection 究竟做了什么?
【发布时间】:2019-05-02 09:58:55
【问题描述】:

我正在使用 apache beam 2.10 并试图了解 flatmap 在将 pcollection 返回给调用者时到底在做什么。

阅读在线文档的解释后,我认为 flatMap 只是将 PCollection 元素集拆分为单个 pcollection,就像在 this 示例中一样。

callable 必须为输入的每个元素返回一个 iterable 收集。这些迭代的元素将被展平为 输出 PCollection。

但在以下代码的情况下,flatMap 会展平“Hello World”的每个字符,而不是整体返回“Hello World”。

def simple(x):
    logging.info("Inside simple type: {0}, val: {1}".format(type(x), x))

    # Just return
    return x

def run():
    with beam.Pipeline(runner='DirectRunner') as p:
        elems = p | 'map:init' >> beam.Create(["Hello World"])

        #
        # Sample with FlatMap
        #
        ( elems | 'flatmap:exec'   >> beam.FlatMap(simple) # 1 to 1 relation with flatmap
            | 'flatmap:out'    >> beam.io.textio.WriteToText(file_path_prefix='flatmap')
        )

def main():
    run()

if __name__ == "__main__":
  main()

结果

H
e
l
l
o

W
o
r
l
d

但是当我返回一个 generator 迭代结果时,就像在这个 code 中一样,它似乎变成了 pcollection。

def simple(x):
    logging.info("Inside simple type: {0}, val: {1}".format(type(x), x))

    # Just return
    yield x

def run():
    with beam.Pipeline(runner='DirectRunner') as p:
        elems = p | 'map:init' >> beam.Create(["Hello World"])

        #
        # Sample with FlatMap
        #
        ( elems | 'flatmap:exec'   >> beam.FlatMap(simple) # 1 to 1 relation with flatmap
            | 'flatmap:out'    >> beam.io.textio.WriteToText(file_path_prefix='flatmap')
        )

def main():
    run()

if __name__ == "__main__":
  main()

结果

Hello World

添加于日本标准时间 2019 年 5 月 2 日

如果我从字面上返回一个 iterator,那么将迭代一个字符串,并将每个字符展平为 PCollections。

def simple(x):
    logging.info("Inside simple type: {0}, val: {1}".format(type(x), x))

    # Just return
    return iter(x)

def run():
    with beam.Pipeline(runner='DirectRunner') as p:
        elems = p | 'map:init' >> beam.Create(["Hello World"])

        #
        # Sample with Map
        #
        ( elems | 'flatmap:exec'   >> beam.FlatMap(simple) # 1 to 1 relation with flatmap
            | 'flatmap:out'    >> beam.io.textio.WriteToText(file_path_prefix='flatmap')
        )

def main():
    run()

if __name__ == "__main__":
  main()

结果

H
e
l
l
o

W
o
r
l
d

那么当将 PCollection 返回给调用者函数时 flatMap 究竟做了什么?

【问题讨论】:

    标签: python apache-beam


    【解决方案1】:

    FlatMap 假定给定函数的返回类型是iterable。在您的第一个示例中,simple 返回"Hello World"。如iterable"Hello World"可以认为是['H','e','l','l','o',' ','W','o','r','l','d']。因此,您的第一个示例如下所示:

    1. [] -> create -> ["Hello World"]
    2. ["Hello World"] -> map -> [['H','e','l','l','o',' ','W','o','r','l','d']]
    3. [['H','e','l','l','o',' ','W','o','r','l','d']] -> flatten -> ['H','e','l','l','o',' ','W','o','r','l','d']

    最终的 PCollection:['H','e','l','l','o',' ','W','o','r','l','d']

    然而,在您的第二个示例中,simple 产生x。您可以认为simple 返回包含单个元素xiterator。所以你的第二个例子是这样的:

    1. [] -> create -> ["Hello World"]
    2. ["Hello World"] -> map -> [["Hello World"]]
    3. [["Hello World"]] -> flatten -> ["Hello World"]

    最终的 PCollection:["Hello World"]

    回答你的最后一个问题:

    yield xreturn iter(x) 在语义上是不同的。下面的例子可以帮助你理解上下文。

    >>> list(iter("abc"))
    ['a', 'b', 'c']
    >>> def x(): yield "abc"
    >>> list(x())
    ['abc']
    

    【讨论】:

    • 感谢您发布答案。我同意你的第一点,但我不认为返回一个 iterator 是正确的理解方式,因为如果我真的通过 iter() 返回迭代器,那么仍然是“Hello World”的每个字符" 被展平为 PCollections。
    猜你喜欢
    • 1970-01-01
    • 2011-07-31
    • 1970-01-01
    • 1970-01-01
    • 2012-07-23
    • 2016-09-10
    • 2023-03-15
    • 2012-10-17
    • 2021-06-04
    相关资源
    最近更新 更多