【问题标题】:JSON upload to BigQueryJSON 上传到 BigQuery
【发布时间】:2021-10-19 07:56:35
【问题描述】:

我正在尝试使用两个成功部署的云功能和一个成功运行的云调度程序自动将 JSON 数据上传到 BigQuery。运行云调度程序后,数据会上传到我的云存储,但不会上传到 BigQuery。

以下是我的代码和 JSON 数据:

# function 1 triggered by http
def function(request):
    url = "https://api...."
    headers = {"Content-Type" : "application/json",
            "Authorization" : "..."}
        
    response = requests.get(url, headers=headers)

    json_data = response.json()
    pretty_json = json.dumps(json_data, indent=4, sort_keys=True)

    storage_client = storage.Client()
    bucket = storage_client.bucket("bucket_name")
    blob = bucket.blob("blob_name")

    blob.upload_from_string(pretty_json)
# function 2 triggered by cloud storage -> event type finalize/create
def function_2(data, context):
    client = bigquery.Client()

    table_id = "booming-post-322920:dataset_name.table_name"

    job_config = bigquery.LoadJobConfig()
    job_config.schema=[
        bigquery.SchemaField("order_items", "INTEGER"),
        bigquery.SchemaField("created_at", "TIMESTAMP"),
        .....,     
        bigquery.SchemaField("updated_at", "TIMESTAMP")
    ]

    job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON

    uri = 'gs://bucket_name/blob_name' 

    load_job = client.load_table_from_uri(
        uri,
        table_id,
        location="US",  
        job_config=job_config
    ) 

    load_job.result()  

这是我的 JSON 数据 pretty_json 的样子:

{
    "records": [
        {
            "active": null,
            "approved": null,
            "buyer": [
                1
            ],
            "cancel_reason": null,
            "cancelled": null,
            "chef": [
                1
            ],
            "completed": null,
            "created_at": "2021-07-15T17:44:31.064Z",
            ...

请指教。

【问题讨论】:

  • 您的意思是说 JSON 没有加载到 BigQuery 中吗?您的问题描述仅提及有关存储的问题。假设 BigQuery 没有加载数据,你是如何触发 function_2 的?您是否同时运行这两个功能?想知道是否可能存在 json 仍在上传到存储桶且 function_2 已被触发的竞争条件。
  • 没有我对函数 2 的触发是云存储更新时。我怀疑 JSON 格式和 bigquery 可能存在问题。
  • @CaioT 是 JSON 未加载到 BigQuery 中
  • 你从哪里得到变量值 'blob_name' 来构建 URI uri = 'gs://bucket_name/blob_name' ?

标签: python google-cloud-platform google-cloud-storage


【解决方案1】:

我认为主要问题是您的 JSON 文件的格式:您按照 BigQuery 的要求指定换行符分隔的 JSON 格式 (bigquery.SourceFormat.NEWLINE_DELIMITED_JSON),但您的 JSON 不符合该格式。

请考虑对您的第一个函数进行以下修改:

def function(request):
    url = "https://api...."
    headers = {"Content-Type" : "application/json",
            "Authorization" : "..."}
        
    response = requests.get(url, headers=headers)

    json_data = response.json()
    
    records = [json.dumps(record) for record in json_data["records"]]
    records_data = "\n".join(records)

    storage_client = storage.Client()
    bucket = storage_client.bucket("bucket_name")
    blob = bucket.blob("blob_name")

    blob.upload_from_string(records_data)

您的 JSON 现在将如下所示:

{"active": null, "approved": null, "buyer": [1], "cancel_reason": null, "cancelled": null, "chef": [1], "completed": null, "created_at": "2021-07-15T17:44:31.064Z", "delivery": false, "delivery_address": null, "delivery_fee": null, "delivery_instructions": null, "discount": 0, "id": 1, "name": "Oak's Order", "notes": null, "order_delivery_time": null, "order_id": null, "order_ready_time": null, "order_submitted_time": null, "paid": null, "pickup_address": "", "promo_applied": null, "promo_code": null, "rated": null, "ratings": null, "review": null, "seller": [1], "status": "In Process", "tax": null, "tip": 0, "total": null, "type": "Pick Up", "updated_at": "2021-07-15T17:44:31.064Z"}
{"active": null, "approved": null, "buyer": [2], "cancel_reason": null, "cancelled": null, "chef": [1], "completed": null, "created_at": "2021-07-15T17:52:53.729Z", "delivery": false, "delivery_address": null, "delivery_fee": null, "delivery_instructions": null, "discount": 0, "id": 2, "name": "Shuu's Order", "notes": null, "order_delivery_time": null, "order_id": null, "order_ready_time": null, "order_submitted_time": null, "paid": null, "pickup_address": "", "promo_applied": null, "promo_code": null, "rated": null, "ratings": null, "review": null, "seller": [1], "status": "In Process", "tax": null, "tip": 0, "total": null, "type": "Pick Up", "updated_at": "2021-07-15T17:52:53.729Z"}

此外,在您的第二个函数中,正如@CaioT 在他/她的评论中所指出的那样,您需要根据GCS storage trigger event definition 更改函数签名以接受两个参数eventcontext .

此外,请考虑根据您的 JSON 字段不存在来查看 BigQuery 架构定义中 order_items 字段的定义。

在导入 JSON 数据时也要注意 BigQuery 强加的limitations,尤其是在处理时间戳时。

最后,确保您的函数具有与 BigQuery 交互所需的权限。

By default,虽然您也可以提供specific service account,但您的函数将在运行时使用您的 App Engine 服务帐户。确保在任何情况下,服务帐户都拥有 BigQuery 和您的 BigQuery 表所需的 permissions。基本上,您的服务帐户必须是数据集的 bigquery.userWRITER(或等效的 bigquery.dataEditor)。请参阅GCP documentation 中提供的示例。

【讨论】:

  • 我更新了 JSON 格式。仍然没有数据上传到 BigQuery。我的 function_2 触发器是 Cloud Storage -> Finalize/Create
  • 我很高兴听到问题已部分解决。我会尝试调试你的功能。您可以通过多种方式查看函数执行日志(浏览器 UI、gcloud 命令行)。请参阅例如this related SO question
  • 谢谢。现在我看到我在 function_2 TypeError: function_2() takes 1 positional argument but 2 were given 中有这个错误,虽然它部署成功。
  • 你的 function_2 应该有两个参数(如果是 GCS PubSub 通知):function_2(event, context)
  • 知道了,解决了。仍然没有运气。 function_1 由 http 触发。 function_2 由云存储触发所以 function_2(data, context)
猜你喜欢
  • 1970-01-01
  • 2018-11-02
  • 1970-01-01
  • 2017-12-03
  • 2016-08-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多