【问题标题】:Read from database using pd.read_sql and asyncio使用 pd.read_sql 和 asyncio 从数据库读取
【发布时间】:2021-10-25 17:46:34
【问题描述】:

我有三个表,每个表都需要大约 1 分钟来查询(即总共 3 分钟)

from my_utils import get_engine
import pandas as pd

def main():
   con1 = get_engine("table1")
   con2 = get_engine("table2")
   con3 = get_engine("table3")

   df1 = pd.read_sql(query1,con=con1)
   df2 = pd.read_sql(query2,con=con2)
   df3 = pd.read_sql(query3,con=con3)

main()

让天空“异步”。

因此我尝试了以下方法(我对使用 asyncio 比较陌生)

.
.
import asyncio

async def get_df1(query1):
   df1 = pd.read_sql(query1,con=con1)
   return df1

async def get_df2(query2):
   df2 = pd.read_sql(query2,con=con2)
   return df2

async def get_df3(query3):
   df3 = pd.read_sql(query3,con=con3)
   return df3

async def main():

 df1,df2,df3 = await asyncio.gather(get_df1(),get_df2(),get_df3())

asyncio.run(main())

它会运行,但它与同步运行所用的时间完全相同。

我错过了什么吗?

【问题讨论】:

标签: python pandas asynchronous python-asyncio


【解决方案1】:

协程之间的切换只发生在await 语句中,并且由于您的await 函数中没有awaits,您的三个查询将只能按顺序执行。由于pd.read_sql 本身不是异步的,因此您必须使用执行器将其包装以制作异步版本:

async def read_sql_async(stmt, con):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, pd.read_sql, stmt, con)

然后您就可以将 read_sql 作为可等待对象运行:

df1 = await read_sql_async(query1, con=con1)

【讨论】:

  • 这似乎可以解决问题!虽然我的一个查询正在停止(如果对 DB atm 有很多请求,我不会) - 但是在每个函数中创建循环是否存在一些问题,即如果您复制示例 10 次从而在每个函数中创建循环?
  • 您可以使用循环设置每个未来,然后再使用gather 执行它们(如果这就是你的意思?)例如如果您有查询和连接列表,您可以使用results = await asyncio.gather(*(read_sql_async(query, con) for query, con in zip(queries, connections))。如果这是一个非常长的列表,尽管您可能最好使用线程池将一次运行的查询数量限制在合理的范围内。
  • 但是,如果你在 get_df1 中有一个循环,循环中的步骤仍然按顺序执行,它们只允许与其他正在运行的协程交错。
  • 现在我对 postgreSQL 数据库的调用正在增加(很多)。我有 6 个调用(总共对三个不同的数据库) - Mysql 和 MSSQL 工作正常,但其中两个是对 PostgreSQL 的调用,它们使用异步非常慢
  • 如何将 chunksize 参数传递给 run_in_executor(None, pd.read_sql, stmt, con) @CutePoison 中的 read_sql 函数
猜你喜欢
  • 2016-02-09
  • 2019-04-14
  • 1970-01-01
  • 2021-01-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-07-07
相关资源
最近更新 更多