【问题标题】:Python - How to - Big Query asynchronous tasksPython - 如何 - Big Query 异步任务
【发布时间】:2019-04-03 13:20:42
【问题描述】:

这可能是一个愚蠢的问题,但我似乎无法异步运行 python google-clood-bigquery。

我的目标是同时运行多个查询并等待所有查询在asyncio.wait() 查询收集器中完成。我正在使用asyncio.create_tast() 来启动查询。 问题是每个查询都在开始之前等待前一个查询完成。

这是我的查询函数(很简单):

async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:
  job = self.api.query(query, **kwargs)
  return job.result()

既然我不能等待job.result(),我应该等待别的吗?

【问题讨论】:

  • 必须使用asyncio吗? Bigquery 的 python api 不支持异步 yeild,因此更好的选择可能是在某些 ThreadPool 执行器的后台执行查询。
  • 我刚开始使用 python,但是是否可以使用您的方法将查询包装为异步调用?

标签: python async-await google-bigquery


【解决方案1】:

如果您在coroutine 内部工作,并且希望在不阻塞event_loop 的情况下运行不同的查询,那么您可以使用run_in_executor 函数,它基本上在后台线程中运行您的查询而不会阻塞循环。 Here's 一个很好的例子来说明如何使用它。

确保这正是您所需要的;为在 Python API 中运行查询而创建的作业已经是异步的,它们只会在您调用 job.result() 时阻塞。这意味着您不需要使用asyncio,除非您在协程中。

这是一个在作业完成后立即检索结果的快速示例:

from concurrent.futures import ThreadPoolExecutor, as_completed
import google.cloud.bigquery as bq


client = bq.Client.from_service_account_json('path/to/key.json')
query1 = 'SELECT 1'
query2 = 'SELECT 2'

threads = []
results = []

executor = ThreadPoolExecutor(5)

for job in [client.query(query1), client.query(query2)]:
    threads.append(executor.submit(job.result))

# Here you can run any code you like. The interpreter is free

for future in as_completed(threads):
    results.append(list(future.result()))

results 将是:

[[Row((2,), {'f0_': 0})], [Row((1,), {'f0_': 0})]]

【讨论】:

    【解决方案2】:

    只是为了分享一个不同的解决方案:

    import numpy as np
    from time import sleep
    
    
    query1 = """
    SELECT
      language.name,
      average(language.bytes)
    FROM `bigquery-public-data.github_repos.languages` 
    , UNNEST(language) AS language
    GROUP BY language.name"""
    query2 = 'SELECT 2'
    
    
    def dummy_callback(future):
        global jobs_done
        jobs_done[future.job_id] = True
    
    
    jobs = [bq.query(query1), bq.query(query2)]
    jobs_done = {job.job_id: False for job in jobs}
    [job.add_done_callback(dummy_callback) for job in jobs]
    
    # blocking loop to wait for jobs to finish
    while not (np.all(list(jobs_done.values()))):
        print('waiting for jobs to finish ... sleeping for 1s')
        sleep(1)
    
    print('all jobs done, do your stuff')
    

    比起使用as_completed,我更喜欢使用 bigquery 作业本身的内置异步功能。这也使我可以将数据管道分解为单独的云函数,而不必在整个管道期间保持主 ThreadPoolExecutor 处于活动状态。顺便说一句,这就是我研究这个的原因:我的管道比 Cloud Functions 的 9 分钟的最大超时时间(甚至 Cloud Run 的 15 分钟)更长。

    缺点是我需要跟踪各种函数中的所有job_ids,但在配置管道时通过指定输入和输出以形成有向无环图,这相对容易解决。

    【讨论】:

    • 我试图用大约 200 个查询来复制它,每个查询大约需要 10 秒,所以我有 for q in queries: job = bq.query(query[q]) jobs_done[job.job_id] = False job.add_callback(dummycallback) print("Now wait") 我没有看到任何显着的加速。似乎缓慢的部分是启动查询本身。
    • 很可能是这种情况,请注意 BQ 启动时间大约为几秒钟。因此,如果您有一些小的查询,您不会看到太大的改进。
    【解决方案3】:

    事实上,感谢asyncio.create_task() 函数,我找到了一种将查询包装在异步调用中的方法。 我只需要将job.result() 包装在协程中;这是实现。它现在确实异步运行。

    class BQApi(object):                                                                                                 
        def __init__(self):                                                                                              
            self.api = bigquery.Client.from_service_account_json(BQ_CONFIG["credentials"])                               
    
        async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:                                       
            job = self.api.query(query, **kwargs)                                                                        
            task = asyncio.create_task(self.coroutine_job(job))                                                          
            return await task                                                                                            
    
        @staticmethod                                                                                                    
        async def coroutine_job(job):                                                                                    
            return job.result()                                                                                          
    

    【讨论】:

    • 我正在努力解决同样的问题@Antoine Dussarps。已在 google-cloud-python 上发布了一个问题,引用了此线程。 github.com/googleapis/google-cloud-python/issues/8726
    • 我认为这不是你想要的。 create_task 安排一些异步工作并让您 await 它。这样您就可以并行启动多个任务,但它不会将阻塞调用转换为 async 调用。当coroutine_job 被调用时,它仍然会阻塞事件循环。你想要的是通过to_thread在一个线程中运行它。
    猜你喜欢
    • 2022-09-25
    • 2019-12-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多