【问题标题】:Python - BigQuery Temporary TablePython - BigQuery 临时表
【发布时间】:2019-12-09 06:36:41
【问题描述】:

是否可以使用 Python 将 Cloud Storage 中已有的数据导入 bigquery 中的临时表?我可以在 Python 中创建一个 BigQuery 临时表并向其中插入数据吗?

【问题讨论】:

  • 你的数据格式是什么,.csv?
  • 是的,可以,但是您需要执行一些步骤。你是通过APP引擎还是GCF,Local运行Python?
  • @ZacharyManesiotis,可以是 csv 或 json!
  • @LuizLai,我正在本地机器上进行测试,但想法是创建一个函数来处理内部气流

标签: python google-bigquery


【解决方案1】:

您只能在 bigquery 脚本或存储过程中创建临时表。

您可以做的是创建具有随机后缀名称和较短有效期的表。在我的示例中为一小时。示例函数创建临时表,只需要一个数据集作为参数。

from google.cloud import bigquery
import datetime, pytz, random

PROJECT = "myproject"


def get_temp_table(dataset: str, table_name: str = None, project=None) -> bigquery.Table:
    prefix = "temp"
    suffix = random.randint(10000, 99999)
    if not table_name:
        table_name = "noname"

    temp_table_name = f"{dataset}.{prefix}_{table_name}_{suffix}"
    if project:
        temp_table_name = f"{project}.{temp_table_name}"
    tmp_table_def = bigquery.Table(temp_table_name)
    tmp_table_def.expires = datetime.datetime.now(pytz.utc) + datetime.timedelta(
        hours=1
    )

    return tmp_table_def


client = bigquery.Client(project=PROJECT)

tmp_table_def = get_temp_table("mydataset", "new_users", project=PROJECT)
tmp_table_def.schema = [
    bigquery.SchemaField("id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("full_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
]
tmp_table = client.create_table(tmp_table_def)  # type: bigquery.Table

data = [
    {"id": "c-1234", "full_name": "John Smith", "age": 39},
    {"id": "c-1234", "full_name": "Patricia Smith", "age": 41},
]

errors = client.insert_rows(tmp_table, data)

print(f"Loaded {len(data)} rows into {tmp_table.dataset_id}:{tmp_table.table_id} with {len(errors)} errors")

【讨论】:

    【解决方案2】:

    (此草案不考虑临时表,但我认为可以提供帮助。) 我将它与谷歌云函数和 Python 3.7 一起使用,效果很好。

    from google.cloud import storage,bigquery
    import json
    import os
    import csv
    import io
    import pandas as pd
    
    def upload_dataframe_gbq(df,table_name):
        bq_client = bigquery.Client()
        dataset_id = 'your_dataset_id'
        dataset_ref = bq_client.dataset(dataset_id)
        table_ref = dataset_ref.table(table_name)
        job = bq_client.load_table_from_dataframe(df, table_ref)
        job.result()  # Waits for table load to complete.
        assert job.state == "DONE"
        table = bq_client.get_table(table_ref)
        print(table.num_rows)
    
    
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="your_credentials.json"
    client = storage.Client()
    bucket = client.get_bucket('your_bucket_name')
    blob = bucket.blob('sample.csv')
    content = blob.download_as_string()
    csv_content = BytesIO(content)
    df = pd.read_csv(csv_content, sep=",", header=0 )
    table_name = "your_big_query_table_name"
    upload_dataframe_gbq(df,table_name)
    

    【讨论】:

    • 我已经有一个永久表的流程,我只是想了解我需要做什么才能发送到临时表。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-01
    • 2020-07-19
    • 2014-01-07
    • 2017-01-01
    • 2016-10-22
    • 2022-10-25
    相关资源
    最近更新 更多