【问题标题】:using dask for sending parallel API request and error handling使用 dask 发送并行 API 请求和错误处理
【发布时间】:2018-10-01 21:04:18
【问题描述】:

我最近开始使用 dask。我想使用 http 请求将数据发送到 REST API,API 返回一个 json 文件以验证数据上传是否成功。这是我的 API 调用函数:

def requestToAPI():
    headers={'Content-Type': 'application/json'}
    data = {
      "api_key" : "xxxxxxxxxxxxx",
      "attributes" : [
       {
         "external_id" : "user1",
         "app_id" : "xxxx-xxx-xxxxx-xxxx",
         "firs_name" : "user_firstname",
         "last_name" : "user_lastname_test"
       }
     ]
    }
    r = requests.post('https://abcdf.com/users/abdcgdu', headers=headers, data=json.dumps(data))
    return r.json()

我有一些从以下代码中获得的 dask 数据帧块:

 rChunk=dd.from_pandas(pandaDataFrame, chunksize=1000)

如何使用 dask 并使用上述块(假设每个块将更改为正确的 json 文件)向 API 发送并行请求并在其中一个请求失败/返回错误时进行正确的错误处理?

我尝试使用 dask.delayed:

[延迟(requestToAPI)(chunk) 用于 rChunk 中的块]

但不确定如何进行正确的错误处理??

【问题讨论】:

  • 出现错误时您希望做什么?
  • @mdurant 只是打印出错误

标签: python api parallel-processing dask


【解决方案1】:

我不确定 dask 数据框是否是您的应用程序的最佳选择。您可能想查看延迟、期货或包 API。

我可能会使用 concurrent.futures

from dask.distributed import Client, as_completed

futures = client.map(process, requests)
for future in as_completed(futures):
    try:
        response = future.result()
        # do stuff with result
    except Exeption:
        # do stuff

http://docs.dask.org/en/latest/futures.html

【讨论】:

  • 这里的流程和要求是什么?
  • 过程=requestToAPI;请求=rChunk
猜你喜欢
  • 2019-05-13
  • 1970-01-01
  • 1970-01-01
  • 2017-11-20
  • 1970-01-01
  • 1970-01-01
  • 2023-02-15
  • 1970-01-01
  • 2022-06-11
相关资源
最近更新 更多