【问题标题】:Apache Beam Pipeline to read from REST API runs locally but not on Dataflow从 REST API 读取的 Apache Beam 管道在本地运行,但不在 Dataflow 上
【发布时间】:2021-01-05 15:50:33
【问题描述】:

我一直在尝试让我的管道在 Dataflow 上使用经典模板运行。

管道应该读取运行时参数 from_dateto_date 并将它们传递给 REST API。然后应该将从 API 返回的答案写入 bigquery 表中。

它在 Dataflow 上运行没有任何错误,但我的数据确实根本没有出现在作为数据接收器的 gbq 表中。 当我在本地执行它时,它就像一个魅力:没有错误,我可以使用服务帐户和本地文件写入 gbq。

我怀疑我误解了不同环境中管道步骤可用的内容,并且实际上没有数据沿管道传递。

requests 包可能在 Dataflow 运行器上不可用,但我预计会出现错误消息...

当我尝试在数据流上运行它但写入文本时(下面的注释行),在云存储上创建了一个文件夹,但里面没有文件出现。

我还怀疑这就是为什么我无法让任何调试消息显示在监控 UI 中。

非常感谢您的帮助 - 这是我的管道代码:

#!/usr/bin/env python
# coding: utf-8

import logging
import argparse

# Beam/Dataflow related imports
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options.value_provider import RuntimeValueProvider

# Handling of API calls
import requests
import json


class get_api_data(beam.DoFn):
    def __init__(self):
        logging.debug("fetching api data")

    def process(self, dates):

        bearer_token = "api_secret"

        from_date = str(dates[0])
        to_date = str(dates[1])

        logging.debug("Now fetching from ", from_date, " to ", to_date)

        payload = {'stuff': 'stuff',
                   'from': from_date,
                   'to': to_date,
                   'other_stuff': 'other_stuff'
                   }

        payload = json.dumps(payload)

        headers = {
                  'Content-Type': 'application/json',
                  'Authorization': 'Bearer ' + bearer_token,
                  'Accept': 'application/json',
                  'Content-Type': 'application/json'
                  }

        r = requests.post("api_url", data= payload, headers=headers)

        return [line.decode("utf-8") for line in r.iter_lines()][1:]


class Split(beam.DoFn):
    def process(self, element):

        try:
            pid, date, another_kpi, yet_another_kpi = element.split(",")
            logging.debug(" | ".join(element.split(",")) )
        except ValueError:
            logging.error(" | ".join(element.split(",")) )

        return [{
            'pid':str(pid),
            'date':str(date),
            'another_kpi':int(another_kpi),
            'yet_another_kpi':float(yet_another_kpi)
        }]


class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        
        parser.add_value_provider_argument('--to_date', dest='to_date', type=str) 
        parser.add_value_provider_argument('--from_date', dest='from_date', type=str)


def run(argv=None):
  
    parser = argparse.ArgumentParser()
    path_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args)

    print("Google Cloud Options: ", pipeline_options.view_as(GoogleCloudOptions))

    from_date = pipeline_options.view_as(UserOptions).from_date
    to_date = pipeline_options.view_as(UserOptions).to_date

    logging.debug("Data from ", from_date, " to ", to_date)

    table_spec = bigquery.TableReference(
        projectId='my_project',
        datasetId='my_dataset',
        tableId='my_table')

    table_schema = 'pid:STRING, date:STRING, another_kpi:INT64, yet_another_kpi:FLOAT64'

    p1 = beam.Pipeline(options=pipeline_options)

    ingest_data = (
        p1
        | 'pass dates' >> beam.Create([[from_date, to_date]])
        | 'fetch API data' >> beam.ParDo(get_api_data()) 
        | 'split records' >> beam.ParDo(Split())
        | 'write into gbq' >> beam.io.gcp.bigquery.WriteToBigQuery(table = table_spec, schema=table_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE ,create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
        #| 'write to text' >> beam.io.WriteToText("./test_v2.csv")
    )

    result = p1.run()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.DEBUG)
    run()

【问题讨论】:

标签: python google-cloud-dataflow apache-beam


【解决方案1】:

显然禁止将 ValueProvider 与 Create 结合使用,尽管我没有收到错误消息。

我解决了这个问题:

class OutputValueProviderFn(beam.DoFn):
    def __init__(self, vp1, vp2):
       self.vp1 = vp1
       self.vp2 = vp2

    def process(self, unused_elm):
        logging.info("Providing dates: ", self.vp1.get(), self.vp2.get() )
        yield [self.vp1.get(), self.vp2.get()]
...

from_date = pipeline_options.view_as(UserOptions).from_date
to_date = pipeline_options.view_as(UserOptions).to_date

pipel = (
        p1
        | 'Start Pipeline' >> beam.Create([None])
        | 'Read from and to date' >> beam.ParDo(OutputValueProviderFn(from_date, to_date))
        | 'fetch API data' >> beam.ParDo(get_api_data())
        ...
    )

灵感here

【讨论】:

    猜你喜欢
    • 2019-04-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-11-01
    • 2020-06-10
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多