【问题标题】:Dataflow transform sending same input to output数据流转换将相同的输入发送到输出
【发布时间】:2021-06-25 00:18:23
【问题描述】:

此管道的目标是了解 Ptransform 在 Pubsub 到 Pubsub Python 管道中的工作原理。我给出了以下输入,但它在输出 pubsub 中给了我相同的输入。这个想法是只从 pubsub 的传入流中获取一个字段,然后只将该字段发送到输出主题。

{"field_1": "14726485", "field_2": "3947183"}

class ExtractStoreStock(beam.PTransform):
    """A transform to extract a field
    """
    def __init__(self, field):
        super(ExtractStoreStock, self).__init__()
        self.field = field

    def expand(self, pcoll):
        return (pcoll
                | beam.Map(lambda elem: (elem[self.field])))


def run(argv=None):
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)

    global cloud_options
    global custom_options

    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True
    )
    cloud_options = pipeline_options.view_as(GoogleCloudOptions)
    custom_options = pipeline_options.view_as(CustomPipelineOptions)
    pipeline_options.view_as(SetupOptions).save_main_session = True

    with beam.Pipeline(options=pipeline_options) as pipeline:
        messages = (
                pipeline
                | "Read from PubSub"
                >> beam.io.ReadFromPubSub(subscription=custom_options.inputSubscription)
        )

        get_stores = messages | "get_store" >> ExtractStoreStock('field_1')

        get_stores | "Write to PubSub" >> beam.io.WriteToPubSub(topic=custom_options.outputTopic)

    pipeline.run()


if __name__ == "__main__":  # noqa
    logging.getLogger().setLevel(logging.INFO)

    run()

我是 beam 或 google 数据流的新手,我不知道要在转换中进行哪些更改才能获得所需的结果。

【问题讨论】:

    标签: python-3.x google-cloud-platform google-cloud-dataflow apache-beam


    【解决方案1】:

    您可能需要添加一个json.loads 来解析您从 Pub/Sub 读取的字节字符串。

    messages | beam.Map(json.loads) | "get_store" >> ExtractStoreStock('field_1')
    

    您也可以将其简化为:

    get_stores = (messages 
                  | beam.Map(json.loads) 
                  | beam.Map(lambda x: x['field_1']))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-08-12
      • 2012-02-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-08-14
      • 1970-01-01
      相关资源
      最近更新 更多