【发布时间】:2019-04-23 12:30:35
【问题描述】:
我不熟悉使用 Apache Beam 和 Dataflow。我想使用数据集作为将使用 Dataflow 并行部署的函数的输入。这是我目前所拥有的:
import os
import apache_beam as beam
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '[location of json service credentails]'
dataflow_options = ['--project=[PROJECT NAME]',
'--job_name=[JOB NAME]',
'--temp_location=gs://[BUCKET NAME]/temp',
'--staging_location=gs://[BUCKET NAME]/stage']
options = PipelineOptions(dataflow_options)
gcloud_options = options.view_as(GoogleCloudOptions)
options.view_as(StandardOptions).runner = 'dataflow'
with beam.Pipeline(options=options) as p:
new_p = p | beam.io.ReadFromText(file_pattern='[file location].csv',
skip_header_lines=1)
| beam.ParDo([Function Name]())
CSV 文件将有 4 列 n 行。每行代表一个实例,每列代表该实例的一个参数。我想将一个实例的所有参数放入一个 beam.DoFn 中,这样我就可以在数据流的帮助下在多台机器上运行它。
如何编写函数以从 PCollection 中获取多个参数?下面的函数是我想象中的样子。
class function_name(beam.DoFn):
def process(self, col_1, col_2, col_3, col_4):
function = function(col_1) + function(col_2) + function(col_3) + function(col_4)
return [function]
【问题讨论】:
-
Beam 具有
PCollection的概念,由element组成,在您的示例中,csv 文件是逐行读取的,每一行都是一个元素,将被 映射 隐含在 ParDo 步骤中的可调用对象。您的process方法中不需要多个参数,您只需要一个参数,在这种情况下将是一个字符串,例如“col1_value, col2_value, col3_value, col4_value”,您需要对其进行拆分和处理,然后作为新的单个元素返回。如果要返回多个值,请使用元组、字典或其他集合作为返回元素。
标签: python google-cloud-dataflow apache-beam