【发布时间】:2019-05-02 09:58:55
【问题描述】:
我正在使用 apache beam 2.10 并试图了解 flatmap 在将 pcollection 返回给调用者时到底在做什么。
阅读在线文档的解释后,我认为 flatMap 只是将 PCollection 元素集拆分为单个 pcollection,就像在 this 示例中一样。
callable 必须为输入的每个元素返回一个 iterable 收集。这些迭代的元素将被展平为 输出 PCollection。
但在以下代码的情况下,flatMap 会展平“Hello World”的每个字符,而不是整体返回“Hello World”。
def simple(x):
logging.info("Inside simple type: {0}, val: {1}".format(type(x), x))
# Just return
return x
def run():
with beam.Pipeline(runner='DirectRunner') as p:
elems = p | 'map:init' >> beam.Create(["Hello World"])
#
# Sample with FlatMap
#
( elems | 'flatmap:exec' >> beam.FlatMap(simple) # 1 to 1 relation with flatmap
| 'flatmap:out' >> beam.io.textio.WriteToText(file_path_prefix='flatmap')
)
def main():
run()
if __name__ == "__main__":
main()
结果
H
e
l
l
o
W
o
r
l
d
但是当我返回一个 generator 迭代结果时,就像在这个 code 中一样,它似乎变成了 pcollection。
def simple(x):
logging.info("Inside simple type: {0}, val: {1}".format(type(x), x))
# Just return
yield x
def run():
with beam.Pipeline(runner='DirectRunner') as p:
elems = p | 'map:init' >> beam.Create(["Hello World"])
#
# Sample with FlatMap
#
( elems | 'flatmap:exec' >> beam.FlatMap(simple) # 1 to 1 relation with flatmap
| 'flatmap:out' >> beam.io.textio.WriteToText(file_path_prefix='flatmap')
)
def main():
run()
if __name__ == "__main__":
main()
结果
Hello World
添加于日本标准时间 2019 年 5 月 2 日
如果我从字面上返回一个 iterator,那么将迭代一个字符串,并将每个字符展平为 PCollections。
def simple(x):
logging.info("Inside simple type: {0}, val: {1}".format(type(x), x))
# Just return
return iter(x)
def run():
with beam.Pipeline(runner='DirectRunner') as p:
elems = p | 'map:init' >> beam.Create(["Hello World"])
#
# Sample with Map
#
( elems | 'flatmap:exec' >> beam.FlatMap(simple) # 1 to 1 relation with flatmap
| 'flatmap:out' >> beam.io.textio.WriteToText(file_path_prefix='flatmap')
)
def main():
run()
if __name__ == "__main__":
main()
结果
H
e
l
l
o
W
o
r
l
d
那么当将 PCollection 返回给调用者函数时 flatMap 究竟做了什么?
【问题讨论】:
标签: python apache-beam