【问题标题】:Python Postgres psycopg2 ThreadedConnectionPool exhaustedPython Postgres psycopg2 ThreadedConnectionPool 用尽
【发布时间】:2018-07-09 23:32:23
【问题描述】:

我在这里查看了几个“太多客户”相关的主题,但仍然无法解决我的问题,所以我不得不再次询问这个问题,针对我的具体情况。

基本上,我设置了本地 Postgres 服务器,需要进行数万次查询,所以我使用了 Python psycopg2package。这是我的代码:

import psycopg2
import pandas as pd
import numpy as np
from flashtext import KeywordProcessor
from psycopg2.pool import ThreadedConnectionPool
from concurrent.futures import ThreadPoolExecutor

df = pd.DataFrame({'S':['California', 'Ohio', 'Texas'], 'T':['Dispatcher', 'Zookeeper', 'Mechanics']})
# df = pd.concat([df]*10000) # repeat df 10000 times

DSN = "postgresql://User:password@localhost/db"
tcp = ThreadedConnectionPool(1, 800, DSN)

def do_one_query(inputS, inputT):
    conn = tcp.getconn()
    c = conn.cursor()

    q = r"SELECT * from eridata where "State" = 'California' and "Title" = 'Dispatcher' limit 1;"   

    c.execute(q)
    all_results = c.fetchall()
    for row in all_results:
        return row
    tcp.putconn(conn, close=True)

cnt=0
for idx, row in df.iterrows():

    cnt+=1
    with ThreadPoolExecutor(max_workers=1) as pool:
        ret = pool.submit(do_one_query,  row["S"], row["T"])
        print ret.result()
    print cnt

代码运行良好,df 很小。如果我重复 df 10000 次,我会收到错误消息,提示连接池已耗尽 .我虽然我使用的连接已被此行关闭:

tcp.putconn(conn, close=True) 但我想实际上他们没有关闭?我该如何解决这个问题?

【问题讨论】:

    标签: python postgresql database-connection threadpool


    【解决方案1】:

    您需要在池顶部使用队列。

    类似下面的东西应该可以工作:

    import gevent, sys, random, psycopg2, logging
    from contextlib import contextmanager
    from gevent.queue import Queue
    from gevent.socket import wait_read, wait_write
    from psycopg2.pool import ThreadedConnectionPool
    from psycopg2 import extensions, OperationalError
    import sys
    logger = logging.getLogger(__name__)
    
    poolsize = 100  #number of max connections
    pdsn = '' # put your dsn here
    
    if sys.version_info[0] >= 3:
        integer_types = (int,)
    else:
        import __builtin__
        integer_types = (int, __builtin__.long)
    
       
    class ConnectorError(Exception):
        """ This is a base class for all CONNECTOR related exceptions """
        pass
    
    #simplified calls etc. db.fetchall(SQL, arg1, arg2...)
    def cursor(): return Pcursor()
    def fetchone(PSQL, *args): return Pcursor().fetchone(PSQL, *args)
    def fetchall(PSQL, *args): return Pcursor().fetchall(PSQL, *args)
    def execute(PSQL, *args): return Pcursor().execute(PSQL, *args)
    
    
    #singleton connection pool, gets reset if a connection is bad or drops
    _pgpool = None
    def pgpool():
        global _pgpool
        if not _pgpool:
            try:
                _pgpool = PostgresConnectionPool(maxsize=poolsize)
            except psycopg2.OperationalError as exc:
                _pgpool = None
        return _pgpool
    
    class Pcursor(object):
    
        def __init__(self, **kwargs):
            #in case of a lost connection lets sit and wait till it's online
            global _pgpool
            if not _pgpool:
                while not _pgpool:
                    try:
                        pgpool()
                    except:
                        logger.debug('Attempting Connection To Postgres...')
                        gevent.sleep(1)
    
        def fetchone(self, PSQL, *args):
            with _pgpool.cursor() as cursor:
                try:
                    cursor.execute(PSQL, args)
                except TypeError:
                    cursor.execute(PSQL, args[0])
                except Exception as exc:
                    print(sys._getframe().f_back.f_code)
                    print(sys._getframe().f_back.f_code.co_name)
                    logger.warning(str(exc))
                logger.debug(cursor.query)
                return cursor.fetchone()
    
        def fetchall(self, PSQL, *args):
            with _pgpool.cursor() as cursor:
                try:
                    cursor.execute(PSQL, args)
                except TypeError:
                    cursor.execute(PSQL, args[0])
                except Exception as exc:
                    print(sys._getframe().f_back.f_code)
                    print(sys._getframe().f_back.f_code.co_name)
                    logger.warning(str(exc))
                logger.debug(cursor.query)
                return cursor.fetchall()
    
        def execute(self, PSQL, *args):
            with _pgpool.cursor() as cursor:
                try:
                    cursor.execute(PSQL, args)
                except TypeError:
                    cursor.execute(PSQL, args[0])
                except Exception as exc:
                    print(sys._getframe().f_back.f_code)
                    print(sys._getframe().f_back.f_code.co_name)
                    logger.warning(str(exc))
                finally:
                    logger.debug(cursor.query)
                    return cursor.query
    
        def fetchmany(self, PSQL, *args):
            with _pgpool.cursor() as cursor:
                try:
                    cursor.execute(PSQL, args)
                except TypeError:
                    cursor.execute(PSQL, args[0])
                while 1:
                    items = cursor.fetchmany()
                    if not items:
                        break
                    for item in items:
                        yield item
    
    class AbstractDatabaseConnectionPool(object):
    
        def __init__(self, maxsize=poolsize):
            if not isinstance(maxsize, integer_types):
                raise TypeError('Expected integer, got %r' % (maxsize, ))
            self.maxsize = maxsize
            self.pool = Queue()
            self.size = 0
    
        def create_connection(self):
            #overridden by PostgresConnectionPool
            raise NotImplementedError()
    
        def get(self):
            pool = self.pool
            if self.size >= self.maxsize or pool.qsize():
                return pool.get()
    
            self.size += 1
            try:
                new_item = self.create_connection()
            except:
                self.size -= 1
                raise
            return new_item
    
        def put(self, item):
            self.pool.put(item)
    
        def closeall(self):
            while not self.pool.empty():
                conn = self.pool.get_nowait()
                try:
                    conn.close()
                except Exception:
                    pass
    
        @contextmanager
        def connection(self, isolation_level=None):
            conn = self.get()
            try:
                if isolation_level is not None:
                    if conn.isolation_level == isolation_level:
                        isolation_level = None
                    else:
                        conn.set_isolation_level(isolation_level)
                yield conn
            except:
                if conn.closed:
                    conn = None
                    self.closeall()
                raise
            else:
                if conn.closed:
                    raise OperationalError("Cannot commit because connection was closed: %r" % (conn, ))
            finally:
                if conn is not None and not conn.closed:
                    if isolation_level is not None:
                        conn.set_isolation_level(isolation_level)
                    self.put(conn)
    
        @contextmanager
        def cursor(self, *args, **kwargs):
            isolation_level = kwargs.pop('isolation_level', None)
            with self.connection(isolation_level) as conn:
                try:
                    yield conn.cursor(*args, **kwargs)
                except:
                    global _pgpool
                    _pgpool = None
                    del(self)
    
    
    class PostgresConnectionPool(AbstractDatabaseConnectionPool):
        def __init__(self,**kwargs):
            try:
                self.pconnect = ThreadedConnectionPool(1, poolsize, dsn=pdsn)
            except:
                global _pgpool
                _pgpool = None
                raise ConnectorError('Database Connection Failed')
            maxsize = kwargs.pop('maxsize', None)
            self.kwargs = kwargs
            AbstractDatabaseConnectionPool.__init__(self, maxsize)
    
        def create_connection(self):
            self.conn = self.pconnect.getconn()
            self.conn.autocommit = True
            return self.conn
    
    
    def gevent_wait_callback(conn, timeout=None):
        """A wait callback useful to allow gevent to work with Psycopg."""
        while 1:
            state = conn.poll()
            if state == extensions.POLL_OK:
                break
            elif state == extensions.POLL_READ:
                wait_read(conn.fileno(), timeout=timeout)
            elif state == extensions.POLL_WRITE:
                wait_write(conn.fileno(), timeout=timeout)
            else:
                raise ConnectorError("Bad result from poll: %r" % state)
    
    extensions.set_wait_callback(gevent_wait_callback)
    

    然后你可以通过这个调用你的连接:

    import db
    db.Pcursor().execute(PSQL, arg1, arg2, arg3)
    

    基本上我借用了 async postgres 的 gevent 示例并对其进行了修改以通过 pyscopg2 支持线程池。

    https://github.com/gevent/gevent/blob/master/examples/psycopg2_pool.py

    我在模块中添加了 psycogreen 的功能,因此您只需导入并调用该类即可。对类的每次调用都会在队列中堆叠一个新查询,但仅使用特定大小的池。这样你就不会用完连接。这与 PGBouncer 所做的基本相似,我认为这也可以解决您的问题。

    https://pgbouncer.github.io/

    【讨论】:

      【解决方案2】:

      我一直在努力寻找有关 ThreadedConnectionPool 工作原理的详细信息。 https://bbengfort.github.io/observations/2017/12/06/psycopg2-transactions.html 还不错,但事实证明它声称 getconn 阻塞直到连接可用是不正确的。检查代码,所有 ThreadedConnectionPool 添加都是围绕 AbstractConnectionPool 方法的锁,以防止竞争条件。如果在任何时候尝试使用超过 maxconn 的连接,将引发 连接池已耗尽 PoolError。

      如果您想要比the accepted answer 更简单的东西,进一步将方法封装在提供阻塞的信号量中,直到连接可用为止应该可以解决问题:

      from psycopg2.pool import ThreadedConnectionPool
      from threading import Semaphore
      
      class ReallyThreadedConnectionPool(ThreadedConnectionPool):
          def __init__(self, minconn, maxconn, *args, **kwargs):
              self._semaphore = Semaphore(maxconn)
              super().__init__(minconn, maxconn, *args, **kwargs)
      
          def getconn(self, *args, **kwargs):
              self._semaphore.acquire()
              return super().getconn(*args, **kwargs)
      
          def putconn(self, *args, **kwargs):
              super().putconn(*args, **kwargs)
              self._semaphore.release()
      

      【讨论】:

      • 只是想进一步提高这个答案,这是对误导性“ThreadedConnectionPool”的完美解决方案
      • 这就是我喜欢队列解决方案的原因。如果您的连接被阻塞等待,您的连接不会失败,并且您仍然可以比检查打开的连接更快地获得响应。
      • 如果super().getconn 引发异常,则getconn 应该释放信号量和raise。在高流量的情况下,信号量可能会出现与饥饿/公平相关的问题,但这可能不是大多数应用程序的问题。
      • @pasztorpisti 全部取决于...您可以尝试/除了释放信号量之外,但是如果您检查可能使您建立连接但释放信号量的代码(不是那里有很多可能出错的地方)。所以问题是哪个更糟糕,有太多的连接打开或未释放的信号量。或者你可以认为这不是你的问题,你的代码做对了,你不能因为模块没有做同样的事情而受到责备。
      • @RuneLyngsoe 总的来说,我认为在分配资源的函数的情况下假设“事务”行为是合理的。当我打开一个文件时,open 函数不应该(或者不应该?)如果它引发异常则泄漏资源。我希望super().getconn 和您的getconn 也有类似的行为。 super().getconn 可能是错误的(并且可能会在出现异常的情况下泄漏资源),但你为什么要在代码中放置/留下错误并通过说“无论如何,super().getconn 可能是错误的......”来刷掉它:- DA信号量增量也是不应该泄露的资源。
      【解决方案3】:

      您的问题是,您实际上并没有将连接返回到池,而是用

      永久关闭它
      tcp.putconn(conn, close=True)
      

      在此处查看文档http://initd.org/psycopg/docs/pool.html

      If close is True, discard the connection from the pool.
      

      因此,如果您将 800 个连接放入池中,在 801 次循环后,您将收到“用尽错误”,因为您的连接池大小为零。

      【讨论】:

        猜你喜欢
        • 2023-02-08
        • 2015-07-10
        • 2011-10-09
        • 1970-01-01
        • 1970-01-01
        • 2019-04-01
        • 2021-07-22
        • 2017-11-18
        • 2014-05-09
        相关资源
        最近更新 更多