【问题标题】:Why does custom Python object cannot be used with ParDo Fn?为什么自定义 Python 对象不能与 ParDo Fn 一起使用?
【发布时间】:2019-09-13 07:51:41
【问题描述】:

我目前还不熟悉在 Python 中将 Apache Beam 与 Dataflow runner 一起使用。我有兴趣创建一个发布到 Google Cloud PubSub 的批处理管道,我已经修改了 Beam Python API 并找到了一个解决方案。然而,在我的探索过程中,我遇到了一些有趣的问题,让我很好奇。

1。成功的管道

目前,我从 GCS 批量发布数据的成功光束管道如下所示:

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        from google.cloud import pubsub_v1
        publisher = pubsub_v1.PublisherClient()
        future = publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()


def run_gcs_to_pubsub(argv):
    options = PipelineOptions(flags=argv)

    from datapipes.common.dataflow_utils import CsvFileSource
    from datapipes.protos import proto_schemas_pb2
    from google.protobuf.json_format import MessageToJson

    with beam.Pipeline(options=options) as p:
        normalized_data = (
                p |
                "Read CSV from GCS" >> beam.io.Read(CsvFileSource(
                    "gs://bucket/path/to/file.csv")) |
                "Normalize to Proto Schema" >> beam.Map(
                        lambda data: MessageToJson(
                            proto_schemas_pb2(data, proto_schemas_pb2.MySchema()),
                            indent=0,
                            preserving_proto_field_name=True)
                    )
        )
        (normalized_data |
            "Write to PubSub" >> beam.ParDo(
                    PublishFn(topic_path="projects/my-gcp-project/topics/mytopic"))
            )

2。不成功的管道

在这里,我试图让发布者在DoFn 之间共享。我尝试了以下方法。

一个。在 DoFn 中初始化发布者

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        from google.cloud import pubsub_v1

        batch_settings = pubsub_v1.types.BatchSettings(
             max_bytes=1024,  # One kilobyte
             max_latency=1,  # One second
         )
        self.publisher = pubsub_v1.PublisherClient(batch_settings)
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()

def run_gcs_to_pubsub(argv):
    ... ## same as 1

b.在 DoFn 之外初始化 Publisher,并将其传递给 DoFn

class PublishFn(beam.DoFn):
    def __init__(self, publisher, topic_path):
        self.publisher = publisher
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()


def run_gcs_to_pubsub(argv):
    .... ## same as 1

    batch_settings = pubsub_v1.types.BatchSettings(
        max_bytes=1024,  # One kilobyte
        max_latency=1,  # One second
    )
    publisher = pubsub_v1.PublisherClient(batch_settings)

    with beam.Pipeline(options=options) as p:
        ... # same as 1
        (normalized_data | 
            "Write to PubSub" >> beam.ParDo(
                PublishFn(publisher=publisher, topic_path="projects/my-gcp-project/topics/mytopic"))
        )

使发布者在DoFn 方法之间共享的两次尝试均失败,并显示以下错误消息:

  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__

  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

我的问题是:

  1. 共享发布者的实施会提高光束管道的性能吗?如果是,那么我想探索这个解决方案。

  2. 为什么我的失败管道会出现错误?是因为在 process 函数之外初始化自定义类对象并将其传递给 DoFn 吗?如果是由于这个原因,我该如何实现一个管道,以便能够在 DoFn 中重用自定义对象?

谢谢您,我们将不胜感激。

编辑:解决方案

好的,所以 Ankur 已经解释了为什么会出现我的问题,并讨论了如何在 DoFn 上进行序列化。基于这些知识,我现在了解到有两种解决方案可以在 DoFn 中使自定义对象共享/可重用:

  1. 使自定义对象可序列化:这允许对象在 DoFn 对象创建期间被初始化/可用(在__init__ 下)。该对象必须是可序列化的,因为它将在创建 DoFn 对象的管道提交期间被序列化(调用__init__)。如何实现这一点在我的回答中得到了回答。此外,我发现此要求实际上与 [1][2] 下的 Beam 文档相关联。

  2. __init__ 之外的 DoFn 函数中初始化不可序列化对象以避免序列化,因为在管道提交期间不会调用 init 之外的函数。 Ankur 的回答中解释了如何做到这一点。

参考资料:

[1]https://beam.apache.org/documentation/programming-guide/#core-beam-transforms

[2]https://beam.apache.org/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms

【问题讨论】:

    标签: python google-cloud-dataflow apache-beam


    【解决方案1】:

    PublisherClient 无法正确腌制。更多关于酸洗here。 在process方法中初始化PublisherClient避免了PublisherClient的酸洗。

    如果打算重用PublisherClient,我建议在process方法中初始化PublisherClient,并使用以下代码将其存储在self中。

    class PublishFn(beam.DoFn):
        def __init__(self, topic_path):
            self.topic_path = topic_path
            super(self.__class__, self).__init__()
    
        def process(self, element, **kwargs):
            if not hasattr(self, 'publish'):
                from google.cloud import pubsub_v1
                self.publisher = pubsub_v1.PublisherClient()
            future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
            return future.result()
    

    【讨论】:

    • 嗨@Ankur 谢谢你的解释。我现在看到问题是由于酸洗造成的。在 python Beam 实现中,除了 DoFn 的process 函数之外,所有的东西都是腌制的吗?
    • 不完全是,让我解释一下。 PublishFn(...) 创建PublishFn 的新对象,该对象被腌制,包括该对象的所有属性。 process 方法只是代码,将像该对象的任何其他方法一样被腌制,但只有当它们支持腌制时才能选取该对象的变量。长答案简短,如果您在PublishFn 中有另一种方法foo,那么它也将被视为类似于process,并且会像process 一样被正确腌制。
    • 嗨@Ankur,因为process 也是腌制的,我有一个后续问题。 publisher 是我在process 内的成功管道中的一个变量(或您的解决方案中的 PublishFn 属性),但为什么我们只能在 process 中将其初始化为 PublisherClient 实例而不是在 __init__ 中?是不是因为如果变量是不可腌制的,如果我们在 process 函数(或 DoFn 中的任何其他方法)中对其进行初始化,它就不会被腌制,而 __init__ 不是这种情况?
    • __init__ 代码在酸洗之前的对象创建时执行。后来这个对象被腌制。如果PublisherClient 在管道提交时是对象的一部分,那么它也会被腌制,这会导致事情失败。 process 在管道提交期间不会被调用,因此客户端不会成为对象的一部分,因此不会被腌制。
    • 非常感谢!我现在更好地理解了这个问题。所以根据我的理解,有两种解决方案可以在DoFn 中使自定义对象共享/可重用:1)使自定义对象可序列化:这允许对象在DoFn 对象创建期间被初始化/可用(__init__) , 这个对象在管道提交过程中也会被腌制 2) 在DoFn 之外的__init__ 的函数中初始化不可序列化的对象以避免腌制,因为在管道提交期间不会调用__init__ 之外的 fns
    【解决方案2】:

    感谢 Ankur,我发现这个问题是由于 python 中的酸洗问题。然后我尝试通过首先解决酸洗PublisherClient 的问题来隔离问题,并在Beam 上通过DoFn 共享PublisherClient 找到了解决方案。

    在python中,我们可以用dill包腌制自定义对象,我意识到这个包已经在Beam python实现中用于腌制对象。于是我尝试排查问题,发现了这个错误:

    TypeError: no default __reduce__ due to non-trivial __cinit__

    然后,我尝试修复此错误,我的管道现在可以正常工作了!

    解决方法如下:

    class PubsubClient(PublisherClient):
        def __reduce__(self):
            return self.__class__, (self.batch_settings,)
    
    # The DoFn to perform on each element in the input PCollection.
    class PublishFn(beam.DoFn):
        def __init__(self, topic_path):
            self.topic_path = topic_path
    
            from google.cloud import pubsub_v1
            batch_settings = pubsub_v1.types.BatchSettings(
                max_bytes=1024,  # One kilobyte
                max_latency=1,  # One second
            )
    
            self.publisher = PubsubClient(batch_settings=batch_settings)
            super(self.__class__, self).__init__()
    
        def process(self, element, **kwargs):
            future = self.publisher.publish(topic=self.topic_path, data=element.encode("utf-8"))
    
            return future.result()
    
    # ...the run_gcs_to_pubsub is the same as my successful pipeline
    

    解决方案的工作原理如下:首先,我从PublisherClient 继承并自己实现__reduce__ 函数。请注意,因为我只使用了batch_settings 属性来初始化我的PublisherClient,所以这个属性对于我的__reduce__ 函数来说已经足够了。然后我在__init__ 中将这个修改后的PublisherClient 用于我的DoFn。

    希望通过这个新解决方案,我的管道将获得性能提升。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-02-17
      • 1970-01-01
      • 2022-12-22
      • 2021-10-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-11-25
      相关资源
      最近更新 更多