【问题标题】:Dataflow Error: 'Clients have non-trivial state that is local and unpickleable'数据流错误:“客户端具有非平凡的本地和不可腌制的状态”
【发布时间】:2018-05-30 19:09:35
【问题描述】:

我有一个可以在本地执行而不会出现任何错误的管道。我曾经在本地运行的管道中遇到此错误

    'Clients have non-trivial state that is local and unpickleable.'
     PicklingError: Pickling client objects is explicitly not supported.

我相信我通过降级到 apache-beam=2.3.0 解决了这个问题 然后在本地它将完美运行。

现在我正在使用 DataflowRunner 并且在 requirements.txt 文件中我有以下依赖项

    apache-beam==2.3.0
    google-cloud-bigquery==1.1.0
    google-cloud-core==0.28.1
    google-cloud-datastore==1.6.0
    google-cloud-storage==1.10.0
    protobuf==3.5.2.post1
    pytz==2013.7

但我又遇到了这个可怕的错误

    'Clients have non-trivial state that is local and unpickleable.'
     PicklingError: Pickling client objects is explicitly not supported.

为什么它给了我 DataflowRunner 而不是 DirectRunner 的错误?他们不应该使用相同的依赖项/环境吗? 任何帮助,将不胜感激。

我已经读到这是解决它的方法,但是当我尝试它时,我仍然得到同样的错误

    class MyDoFn(beam.DoFn):

        def start_bundle(self, process_context):
            self._dsclient = datastore.Client()

        def process(self, context, *args, **kwargs):
        # do stuff with self._dsclient

来自https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3191

我之前在本地修复此问题的参考帖子:

Using start_bundle() in apache-beam job not working. Unpickleable storage.Client()

提前致谢!

【问题讨论】:

  • 你有堆栈跟踪吗?
  • DirectRunner 旨在在部署之前验证您的管道,并确保其在各种 Beam runner 中的稳健性。因此,如果 DataflowRunner 与 DirectRunner 在相同的 Beam 版本上运行,它应该可以在 DataflowRunner 上运行。你能分享一份堆栈跟踪或作业日志吗?
  • 在尝试从管道将数据写入 BigQuery 表时,我在 Dataflow 上遇到了类似的问题,尽管它是从 DirectRunner 运行的。有没有人在从 Dataflow 写入 BigQuery 时遇到过类似的问题。

标签: python google-cloud-dataflow pickle apache-beam


【解决方案1】:

start_bundle 方法中初始化unpickleable 客户端是正确的方法,Beam IO 经常遵循此方法,请参阅datastoreio.py 作为示例。这是一个在 DoFn 中使用 GCS python 客户端执行简单操作的管道。我在 Apache Beam 2.16.0 上运行它没有问题。如果您仍然可以重现您的问题,请提供更多详细信息。

gcs_client.py 文件:

import argparse
import logging
import time

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import storage

class MyDoFn(beam.DoFn):
  def start_bundle(self):
    self.storage_client = storage.Client()

  def process(self, element):
    bucket = self.storage_client.get_bucket("existing-gcs-bucket")
    blob = bucket.blob(str(int(time.time())))
    blob.upload_from_string("payload")
    return element

logging.getLogger().setLevel(logging.INFO)
_, options = argparse.ArgumentParser().parse_known_args()

pipeline_options = PipelineOptions(options)
p = beam.Pipeline(options=pipeline_options)
_ = p | beam.Create([None]) | beam.ParDo(MyDoFn())

p.run().wait_until_finish()

requirements.txt 文件:

google-cloud-storage==1.23.0

命令行:

python -m gcs_client \
    --project=insert_your_project \
    --runner=DataflowRunner \
    --temp_location gs://existing-gcs-bucket/temp/ \
    --requirements_file=requirements.txt \
    --save_main_session

【讨论】:

    【解决方案2】:

    在让 Dataflow 将一堆行写入 Bigtable 时,我遇到了类似的问题。将--save-main-session 设置为False 似乎已经解决了。

    【讨论】:

    • 将 --save-main-session 设置为 False 对我不起作用
    猜你喜欢
    • 2020-11-18
    • 2020-05-03
    • 2011-07-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多