【问题标题】:ValueError: A BigQuery table or a query must be specifiedValueError:必须指定 BigQuery 表或查询
【发布时间】:2019-12-23 04:36:12
【问题描述】:

我的云功能是根据某些规则形成动态查询并将其存储在文件中到云存储中,然后它将调用我的数据流模板。我将 inputfile 作为 ValueProvider 传递给我的数据流模板,该模板正在保存一个查询,进一步我正在尝试通过 beam.io.BigQuerySource 在我的管道中使用该查询。但它给了我一个错误:ValueError: A BigQuery table or a query must be specified

一些云函数代码:

query_job = client.query(
        query,
        job_config=job_config)
    query_job.result()
    print('Query results loaded to table {}'.format(table_ref.path))
    file_name = '{}_RM_{}.csv'.format(unit, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')[:-3])
    destination_uri = "gs://test-bucket/{}".format(file_name)
    dataset_ref = client.dataset(dataset_id, project=PROJECT)
    table_ref = dataset_ref.table(table_name)

    extract_job = client.extract_table(
        table_ref,
        destination_uri)
    extract_job.result() #Extracts results to the GCS
    client.delete_table(table_ref) #Deletes table in BQ


    BODY = {
        "jobName": "{jobname}".format(jobname=JOBNAME),
        "parameters": {
            "inputFile": destination_uri
        },
        "environment": {
            "tempLocation": "gs://{bucket}/temp".format(bucket=BUCKET),
            "zone": "europe-west1-b"
        }
    }

    request = service.projects().templates().launch(projectId=PROJECT, gcsPath=GCSPATH, body=BODY)
    response = request.execute()

数据流代码:

class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--inputFile', default='query.txt')


class Query:
    def query_final(self, inputFile):
        from google.cloud import storage
        client = storage.Client()
        bucket = client.get_bucket('ingka-retention-test-bucket')
        blob = bucket.get_blob(str(inputFile))
        return blob
def dataflow():
    options = PipelineOptions.from_dictionary(pipeline_options)
    user_options = options.view_as(UserOptions)

    inputFile = user_options.inputFile
    new_query = Query()
    final_query = new_query.query_final(inputFile)

    with beam.Pipeline(options=options) as p:
        rows = p | 'Read Orders from BigQuery ' >> beam.io.Read(beam.io.BigQuerySource(query=final_query, use_standard_sql=True))

完成此任务的原因或任何其他最佳方法是什么? 提前谢谢你!

【问题讨论】:

    标签: python google-bigquery google-cloud-functions google-cloud-dataflow apache-beam


    【解决方案1】:

    这对于 BigQuery 源是不可能的,因为该语句在图形编译时是必需的。源代码和设置在编译时被锁定。

    解决方案:您可以在 ParDo 中使用 BQ API 内联并参数化代码,类似于您在上面所做的。这是在运行时解释的。要启动您的 ParDo,您将构建一个 PCollection,其中包含一些与您想要拨打电话的 N 次相对应的项目。请记住,如果您的整个 ParDo 范围出现故障,您将不得不处理任何幂等性问题。

    [How to run dynamic second query in google cloud dataflow?

    【讨论】:

      猜你喜欢
      • 2021-09-23
      • 2021-03-16
      • 2021-04-08
      • 2023-03-27
      • 2017-05-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多