【问题标题】:TypeError when connecting to Google Cloud BigQuery from Apache Beam Dataflow in Python?从 Python 中的 Apache Beam Dataflow 连接到 Google Cloud BigQuery 时出现 TypeError?
【发布时间】:2019-06-04 16:32:47
【问题描述】:

当尝试在 apache Beam 的谷歌云数据流中初始化 python BigQuery Client() 时,它给了我一个类型错误:

TypeError('__init__() takes 2 positional arguments but 3 were given')

我正在使用 Python 3.7 和 Apache Beam 数据流,我必须手动初始化客户端并写入 BigQuery,而不是使用 ptransform,因为我想使用通过运行时参数传递的动态表名。

我尝试将项目和凭据传递给客户端,但它似乎没有做任何事情。此外,如果我使用 google-cloud-bigquery==1.11.2 而不是 1.13.0 它可以正常工作,在 apache Beam 之外使用 1.13.0 也可以完全正常工作。

我显然已经剪掉了一些代码,但这基本上是引发错误的原因

class SaveObjectsBigQuery(beam.DoFn):
    def process(self, element, *args, **kwargs):
        # Establish BigQuery client
        client = bigquery.Client(project=project)


def run():
    pipeline_options = PipelineOptions()

    # GoogleCloud options object
    cloud_options = pipeline_options.view_as(GoogleCloudOptions)

    pipeline_options.view_as(SetupOptions).save_main_session = True

    with beam.Pipeline(options=pipeline_options) as p:
        _data = (p
                 | "Create" >> beam.Create(["Start"])
                 )

        save_data_bigquery = _data | "Save to BigQuery" >> beam.ParDo(SaveObjectsBigQuery())

在 google-cloud-bigquery 的早期版本中,这可以正常工作,并且我可以使用运行时参数和 insert_rows_json 创建一个表,而不会出现任何问题。显然,使用 WriteToBigquery Ptransform 是理想的,但由于需要动态命名 bigquery 表,这是不可能的。

编辑:

我更新了代码以尝试取出运行时值提供程序和 lambda 函数,尽管两者都收到了类似的错误:

`AttributeError: 'function/RuntimeValueProvider' 对象没有属性 'tableId'

我本质上是在启动数据流模板时尝试使用运行时值提供程序,以使用 WriteToBigQuery PTransform 动态命名 bigquery 表。

save_data_bigquery = _data | WriteToBigQuery(
            project=project,
            dataset="campaign_contact",
            table=value_provider.RuntimeValueProvider(option_name="table", default_value=None, value_type=str),
            schema="id:STRING",
            create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=BigQueryDisposition.WRITE_APPEND
        )
save_data_bigquery = _data | WriteToBigQuery(
            table=lambda table: f"{project}:dataset.{runtime_options.table}",
            schema="id:STRING",
            create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=BigQueryDisposition.WRITE_APPEND
        )

【问题讨论】:

  • 你让我们猜测错误在哪里。请编辑问题以包含完整的错误回溯消息。

标签: python google-bigquery google-cloud-dataflow apache-beam


【解决方案1】:

从 Beam 2.12 开始,您可以使用 WriteToBigQuery 转换动态分配目标。我建议你试试看:)

查看 Beam 代码库中的 this test,其中显示了此示例。

【讨论】:

  • 感谢您的回答 Pablo,这也适用于 RuntimeValueProviders 吗?
  • 我将管道作为模板启动,我尝试使用 RuntimeValueProvider 选项,或使用 lambda 函数尝试获取变量,尽管它返回 AttributeError: 'function' object has no attribute 'tableId'
  • 谢谢 Pablo,我刚刚编辑了问题以包含一些示例代码。
  • 哦,呃-我很抱歉。您需要将 --experiments=use_beam_bq_sink 作为管道选项传递,因为该功能是实验性 atm。
猜你喜欢
  • 1970-01-01
  • 2019-05-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-12-01
  • 2018-10-17
  • 2018-01-25
  • 2019-02-15
相关资源
最近更新 更多