【问题标题】:Deploying a Dataflow Pipeline using Python and Apache Beam使用 Python 和 Apache Beam 部署数据流管道
【发布时间】:2019-04-23 12:30:35
【问题描述】:

我不熟悉使用 Apache Beam 和 Dataflow。我想使用数据集作为将使用 Dataflow 并行部署的函数的输入。这是我目前所拥有的:

import os
import apache_beam as beam
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '[location of json service credentails]'

dataflow_options = ['--project=[PROJECT NAME]',
                    '--job_name=[JOB NAME]',
                    '--temp_location=gs://[BUCKET NAME]/temp',
                    '--staging_location=gs://[BUCKET NAME]/stage']
options = PipelineOptions(dataflow_options)
gcloud_options = options.view_as(GoogleCloudOptions)
options.view_as(StandardOptions).runner = 'dataflow'

with beam.Pipeline(options=options) as p:
     new_p = p | beam.io.ReadFromText(file_pattern='[file location].csv',
                                      skip_header_lines=1)
               | beam.ParDo([Function Name]())

CSV 文件将有 4 列 n 行。每行代表一个实例,每列代表该实例的一个参数。我想将一个实例的所有参数放入一个 beam.DoFn 中,这样我就可以在数据流的帮助下在多台机器上运行它。

如何编写函数以从 PCollection 中获取多个参数?下面的函数是我想象中的样子。

class function_name(beam.DoFn):
    def process(self, col_1, col_2, col_3, col_4):
    function = function(col_1) + function(col_2) + function(col_3) + function(col_4)
    return [function]

【问题讨论】:

  • Beam 具有PCollection 的概念,由element 组成,在您的示例中,csv 文件是逐行读取的,每一行都是一个元素,将被 映射 隐含在 ParDo 步骤中的可调用对象。您的 process 方法中不需要多个参数,您只需要一个参数,在这种情况下将是一个字符串,例如“col1_value, col2_value, col3_value, col4_value”,您需要对其进行拆分和处理,然后作为新的单个元素返回。如果要返回多个值,请使用元组、字典或其他集合作为返回元素。

标签: python google-cloud-dataflow apache-beam


【解决方案1】:

ReadFromText 的具体化返回将是一个 PCollection,其中字符串仍然是定界的。

您的 ParDo 应该采用 String 的一个元素,然后进行拆分,您可以将其作为 col 名称和值的 Dict 产生。

【讨论】:

    猜你喜欢
    • 2022-08-16
    • 2021-06-27
    • 1970-01-01
    • 2020-05-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多