【问题标题】:For loop to write several files in cloud storage from cloud functionFor循环从云功能将多个文件写入云存储
【发布时间】:2020-03-25 10:04:31
【问题描述】:

我在python中有一个云函数来调用rest api get。函数响应为传递给函数的每个 json 文件名返回一个 json 文件。该函数对单个调用完美运行:

def request_func(x):
    url = 'https://use4.dm-us.informaticacloud.com/saas/api/v2/'+ x
    payload_headers = {'content-type': 'application/json','Accept': 'application/json','icSessionId':ricSessionId}
    r = requests.get(url,headers=payload_headers)
    data = r.json()
    return data

def upload_blob(bucket_name, blob_text, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_string(blob_text)

wf_name = 'JOB_NAME'

data = request_func('workflow/name/'+ wf_name)
rid = data['id']
data = request_func('activity/activityLog?taskId='+rid)

def monitoramento(request):
    BUCKET_NAME = 'bucketname'
    BLOB_NAME = wf_name
    BLOB_STR = json.dumps(data)
    upload_blob(BUCKET_NAME, BLOB_STR, BLOB_NAME)
    return f'{wf_name} forecast json file success uploaded!'

我需要将几个 json 文件传递​​给函数,然后我对函数进行了一些调整以使用 FOR LOOP:

def request_func(x):
    url = 'https://use4.dm-us.informaticacloud.com/saas/api/v2/'+ x
    payload_headers = {'content-type': 'application/json','Accept': 'application/json','icSessionId':ricSessionId}
    r = requests.get(url,headers=payload_headers)
    data = r.json()
    return data

def upload_blob(bucket_name, blob_text, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_string(blob_text)

list_jobs = ['JOB_NAME1','JOB_NAME2','JOB_NAME3']

for wf_name in list_jobs:

    data = request_func('workflow/name/'+ wf_name)
    rid = data['id']
    data = request_func('activity/activityLog?taskId='+rid)

    def monitoramento(request):
        BUCKET_NAME = 'bucketname'
        BLOB_NAME = wf_name
        BLOB_STR = json.dumps(data)
        upload_blob(BUCKET_NAME, BLOB_STR, BLOB_NAME)

问题是只有列表的最后一个对象,JOB_NAME3 json 文件,被写入存储桶! 我做错了什么? 我省略了一些代码。

【问题讨论】:

  • monitoramento() 是从哪里调用的?提示:不要省略代码。在寻求帮助时创建一个最小且可重复的示例。
  • 请不要使用全部大写来发布您的信息。您能否对其进行编辑以使其看起来不像大喊大叫?
  • 我建议您参加 Stackoverflow 之旅:stackoverflow.com/tour。要创建好的编程问题:stackoverflow.com/help/minimal-reproducible-example
  • 你为什么要在循环中定义一个函数?!据我所知,该功能甚至从未运行过。
  • @RobertAlmeida 它不应该在运行,你确定是吗?其他一些函数可以创建类似的输出吗?

标签: python google-cloud-platform google-cloud-functions google-cloud-storage


【解决方案1】:

使用@AlexanderCécile 提示我可以解决我的问题:

def request_func(x):
    url = 'https://use4.dm-us.informaticacloud.com/saas/api/v2/'+ x
    payload_headers = {'content-type': 'application/json','Accept': 'application/json','icSessionId':ricSessionId}
    r = requests.get(url,headers=payload_headers)
    data = r.json()
    return data

def upload_blob(bucket_name, blob_text, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_string(blob_text)

list_jobs = ['JOB_NAME1','JOB_NAME2','JOB_NAME3']

for wf_name in list_jobs:
    data = request_func('workflow/name/'+ wf_name)
    rid = data['id']
    data = request_func('activity/activityLog?taskId='+rid)
    BUCKET_NAME = 'monitoramento'
    BLOB_NAME = wf_name
    BLOB_STR = json.dumps(data)
    upload_blob(BUCKET_NAME, BLOB_STR, BLOB_NAME)

【讨论】:

    猜你喜欢
    • 2019-08-08
    • 2018-11-13
    • 1970-01-01
    • 2020-04-25
    • 2017-12-21
    • 1970-01-01
    • 2018-12-24
    • 2021-10-28
    • 1970-01-01
    相关资源
    最近更新 更多