【发布时间】:2021-08-12 08:35:28
【问题描述】:
我使用 beam 在本地构建并成功运行了一条令人满意的管道,我准备将作业发送到 DataFlow。
我计划只使用save_main_session 管道选项腌制我的会话,但是在尝试这样做时遇到了递归错误。经过几次反复试验,我设法将其范围缩小到我定义 ptransform_fn 的方式,并使用装饰器。
请在下面找到一个最小的可重现示例
# my_script.py
from typing import Set
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from apache_beam.transforms.ptransform import ptransform_fn
@ptransform_fn
def my_function(pcoll):
return pcoll | beam.Create([1])
if __name__ == "__main__":
options = PipelineOptions()
options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=options) as p:
p | my_function()
完整的回溯很长,但以RecursionError: maximum recursion depth exceeded while calling a Python object结尾
(请注意,这是启用此错误的save_main_session=True)选项,因此我可以使用本地运行器运行此python -m my_script,并将运行到RecursionError)
由于ptransform_fn 实际上使my_function 以“非pythonic”方式运行(在没有定义参数的情况下调用),所以pickler 库似乎对此有问题。
所以我最后的问题是:
- 这是预期的行为吗?如果我想使用 save_main_session ,我应该坚持定义继承自 Beam.PTransform 的类吗?
- 是否有一种简单的方法(如设置管道选项)能够腌制此脚本并在数据流上运行它?
【问题讨论】:
标签: python python-3.x apache-beam