【问题标题】:Create a table from query results in Google BigQuery根据 Google BigQuery 中的查询结果创建表
【发布时间】:2013-01-15 08:41:30
【问题描述】:

我们通过 Python API 使用 Google BigQuery。如何从查询结果创建表(新表或覆盖旧表)?我查看了query documentation,但没有发现它有用。

我们要模拟:

“SELEC ... INTO ...”来自 ANSI SQL。

【问题讨论】:

    标签: python google-app-engine google-bigquery


    【解决方案1】:

    您可以通过在查询中指定目标表来执行此操作。您需要使用Jobs.insert API 而不是Jobs.query 调用,并且您应该指定writeDisposition=WRITE_APPEND 并填写目标表。

    如果您使用的是原始 API,下面是配置的样子。如果您使用的是 Python,Python 客户端应该为这些相同的字段提供访问器:

    "configuration": {
      "query": {
        "query": "select count(*) from foo.bar",
        "destinationTable": {
          "projectId": "my_project",
          "datasetId": "my_dataset",
          "tableId": "my_table"
        },
        "createDisposition": "CREATE_IF_NEEDED",
        "writeDisposition": "WRITE_APPEND",
      }
    }
    

    【讨论】:

    • 您能否提供工作主体数据示例(该配置的外观如何)?
    • 已添加,但是如果您使用的是 python API,您可能会通过客户端方法而不是原始 JSON 来设置配置。
    • 我正在尝试使用此类作业对 BigQuery 中的表进行非规范化。当查询的行数有限制(几十万)时,作业运行良好。如果没有 LIMIT 子句,作业会在几秒钟后失败,并显示: query FAILURE 作业执行期间遇到的错误。响应太大而无法返回。我怎样才能获得更多详细信息?我的查询是否太复杂? (30M 行表与另一个 70M 行表连接)。谢谢!
    • 我正在使用的查询:SELECT a1, a2, b1, b2, b3 FROM da1.a INNER JOIN ds1.b ON ds1.a.publicacao_publicacaoID = ds1.b.publicacaoID
    • 您可以通过指定查询的目标表(如果您使用 Web UI,您可以在查询窗格中选择“启用选项”)并设置 allowLargeResults 来获得完整的查询结果。跨度>
    【解决方案2】:

    接受的答案是正确的,但它没有提供 Python 代码来执行任务。这是一个示例,重构自我刚刚编写的一个小型自定义客户端类。它不处理异常,并且应该定制硬编码查询来做一些比SELECT *更有趣的事情......

    import time
    
    from google.cloud import bigquery
    from google.cloud.bigquery.table import Table
    from google.cloud.bigquery.dataset import Dataset
    
    
    class Client(object):
    
        def __init__(self, origin_project, origin_dataset, origin_table,
                     destination_dataset, destination_table):
            """
            A Client that performs a hardcoded SELECT and INSERTS the results in a
            user-specified location.
    
            All init args are strings. Note that the destination project is the
            default project from your Google Cloud configuration.
            """
            self.project = origin_project
            self.dataset = origin_dataset
            self.table = origin_table
            self.dest_dataset = destination_dataset
            self.dest_table_name = destination_table
            self.client = bigquery.Client()
    
        def run(self):
            query = ("SELECT * FROM `{project}.{dataset}.{table}`;".format(
                project=self.project, dataset=self.dataset, table=self.table))
    
            job_config = bigquery.QueryJobConfig()
    
            # Set configuration.query.destinationTable
            destination_dataset = self.client.dataset(self.dest_dataset)
            destination_table = destination_dataset.table(self.dest_table_name)
            job_config.destination = destination_table
    
            # Set configuration.query.createDisposition
            job_config.create_disposition = 'CREATE_IF_NEEDED'
    
            # Set configuration.query.writeDisposition
            job_config.write_disposition = 'WRITE_APPEND'
    
            # Start the query
            job = self.client.query(query, job_config=job_config)
    
            # Wait for the query to finish
            job.result()
    

    【讨论】:

    • 你知道如何设置目标表的模式,包括模式等吗?我试过了,但不包括模式。例如,在源表中,我有一些模式为“REQUIRED”的字段,但是当创建目标表时,所有字段都处于“NULLABLE”模式。
    【解决方案3】:

    根据 Google BigQuery 中的查询结果创建表格。假设您正在使用带有 Python 3 的 Jupyter Notebook,将解释以下步骤:

    1. 如何在 BQ 上创建新数据集(以保存结果)
    2. 如何在 BQ 上运行查询并将结果以表格格式保存在新数据集中

    在 BQ 上创建一个新的 DataSet:my_dataset

    bigquery_client  = bigquery.Client() #Create a BigQuery service object
    dataset_id = 'my_dataset' 
    dataset_ref = bigquery_client.dataset(dataset_id) # Create a DatasetReference using a chosen dataset ID.
    dataset = bigquery.Dataset(dataset_ref)  # Construct a full Dataset object to send to the API.
    dataset.location = 'US' # Specify the geographic location where the new dataset will reside. Remember this should be same location as that of source data set from where we are getting data to run a query
    
    # Send the dataset to the API for creation. Raises google.api_core.exceptions.AlreadyExists if the Dataset already exists within the project.
    dataset = bigquery_client.create_dataset(dataset)  # API request
    print('Dataset {} created.'.format(dataset.dataset_id))
    

    使用 Python 在 BQ 上运行查询:

    这里有两种类型:

    1. 允许大结果
    2. 查询而不提及大结果等。

    我在这里使用公共数据集:bigquery-public-data:hacker_news & Table id: cmets 来运行查询。

    允许大结果

    DestinationTableName='table_id1'  #Enter new table name you want to give
    !bq query --allow_large_results --destination_table=project_id:my_dataset.$DestinationTableName 'SELECT * FROM [bigquery-public-data:hacker_news.comments]'
    

    如果需要,此查询将允许较大的查询结果。

    不提--allow_large_results:

    DestinationTableName='table_id2'  #Enter new table name you want to give
    !bq query destination_table=project_id:my_dataset.$DestinationTableName 'SELECT * FROM [bigquery-public-data:hacker_news.comments] LIMIT 100'
    

    这适用于结果不会超过 Google BQ 文档中提到的限制的查询。

    输出:

    1. BQ 上名为 my_dataset 的新数据集
    2. 查询结果保存为 my_dataset 中的表

    注意:

    1. 这些查询是可以在终端上运行的命令(开头不带 !)。但是当我们使用 Python 来运行这些命令/查询时,我们正在使用 !。这将使我们也能够在 Python 程序中使用/运行命令。
    2. 还请为答案投票:)。谢谢。

    【讨论】:

      猜你喜欢
      • 2021-11-08
      • 2019-03-18
      • 2014-11-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多