【发布时间】:2021-06-25 00:18:23
【问题描述】:
此管道的目标是了解 Ptransform 在 Pubsub 到 Pubsub Python 管道中的工作原理。我给出了以下输入,但它在输出 pubsub 中给了我相同的输入。这个想法是只从 pubsub 的传入流中获取一个字段,然后只将该字段发送到输出主题。
{"field_1": "14726485", "field_2": "3947183"}
class ExtractStoreStock(beam.PTransform):
"""A transform to extract a field
"""
def __init__(self, field):
super(ExtractStoreStock, self).__init__()
self.field = field
def expand(self, pcoll):
return (pcoll
| beam.Map(lambda elem: (elem[self.field])))
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
global cloud_options
global custom_options
pipeline_options = PipelineOptions(
pipeline_args, streaming=True
)
cloud_options = pipeline_options.view_as(GoogleCloudOptions)
custom_options = pipeline_options.view_as(CustomPipelineOptions)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as pipeline:
messages = (
pipeline
| "Read from PubSub"
>> beam.io.ReadFromPubSub(subscription=custom_options.inputSubscription)
)
get_stores = messages | "get_store" >> ExtractStoreStock('field_1')
get_stores | "Write to PubSub" >> beam.io.WriteToPubSub(topic=custom_options.outputTopic)
pipeline.run()
if __name__ == "__main__": # noqa
logging.getLogger().setLevel(logging.INFO)
run()
我是 beam 或 google 数据流的新手,我不知道要在转换中进行哪些更改才能获得所需的结果。
【问题讨论】:
标签: python-3.x google-cloud-platform google-cloud-dataflow apache-beam