【问题标题】:Caching/reusing a DB connection for later view usage缓存/重用数据库连接以供以后查看使用
【发布时间】:2019-04-08 10:05:43
【问题描述】:

我正在保存用户的数据库连接。在他们第一次输入凭据时,我会执行以下操作:

self.conn = MySQLdb.connect (
    host = 'aaa',
    user = 'bbb',
    passwd = 'ccc',
    db = 'ddd',
    charset='utf8'
)
cursor = self.conn.cursor()
cursor.execute("SET NAMES utf8")
cursor.execute('SET CHARACTER SET utf8;')
cursor.execute('SET character_set_connection=utf8;')

然后我准备好conn 来处理所有用户的查询。但是,我不想每次加载 view 时都重新连接。我将如何存储这个“开放连接”,以便我可以在视图中执行以下操作:

def do_queries(request, sql):
    user = request.user
    conn = request.session['conn']
    cursor = request.session['cursor']
    cursor.execute(sql)

更新:似乎上述方法是不可能的,也不是好的做法,所以让我重新表述一下我正在尝试做的事情:

我有一个用户可以在输入凭据后使用的 sql 编辑器(想想 Navicat 或 SequelPro 之类的东西)。请注意,这是 NOT 默认的 django db 连接——我事先不知道凭据。现在,一旦用户“连接”,我希望他们能够进行尽可能多的查询,而不必每次他们这样做时都重新连接。例如——再次重复——像 Navicat 或 SequelPro。这将如何使用 python、django 或 mysql 完成?也许我不太明白这里有什么必要(缓存连接?连接池?等等),所以任何建议或帮助将不胜感激。

【问题讨论】:

  • @kungphu 为此,用例是我们有一个 sql 编辑器,有人输入他们的 sql 凭据,然后他们可以查询他们的数据库,我们显示结果(想想任何 sql gui)。您认为“存储”连接而无需重新连接的最佳方式是什么?
  • @kungphu -- 更新问题。
  • 嗯,一个简单的方法是不关闭如果你想重用会话:-)
  • @dnoeth -- 你能否演示一下在实践中如何做到这一点?
  • 您预计有多少用户同时处于活跃状态?此外,是否通过多个 http 请求发送多个查询?如果你在下面使用 Django,我会假设是这样。

标签: python sql database database-connection connection-pooling


【解决方案1】:

我实际上分享了我对这个确切问题的解决方案。我在这里所做的是创建一个可以指定最大值的连接池,然后通过此通道异步排队查询请求。这样你可以保持一定数量的连接打开,但它会排队和池异步并保持你习惯的速度。

这需要 gevent 和 postgres。

Python Postgres psycopg2 ThreadedConnectionPool exhausted

【讨论】:

  • @eatmeimandanish - 这是一个很好的答案,感谢分享。在性能方面,使用连接池比每次重新连接快多少?例如,a typical query that takes 34ms with connection pooling takes 107ms without it.——你有任何指标吗?
  • 池仅在无连接时连接。池连接后,只要队列已满,它就会保持打开状态。这是正常行为。您不希望连接挂在不执行任何操作的数据库中。
  • 我知道我说的是使用连接池和不使用连接池的时间差是多少?
  • 一个使用一定数量的实际连接并将这些连接重用于排队查询。如果没有池,则所有查询只需要一个永久连接,或者每个查询都需要一个新连接。
  • 谢谢——我理解这个概念。我是说:您是否有关于在每个查询上执行连接池和执行正常连接/断开连接之间的时间差异的基准(或者您可以执行基准)?时差是多少?
【解决方案2】:

在 Web 应用程序上下文中同步执行此类操作不是一个好主意。请记住,您的应用程序可能需要以多进程/线程方式工作,并且您无法正常共享进程之间的连接。因此,如果您在进程上为用户创建连接,则无法保证在同一进程上接收查询请求。可能更好的主意是让一个进程后台工作人员处理多个线程(每个会话一个线程)中的连接,以对数据库进行查询并在 Web 应用程序上检索结果。您的应用程序应该为每个会话分配一个唯一的 ID,并且后台工作人员使用会话 ID 跟踪每个线程。您可以使用celery 或任何其他支持异步结果的任务队列。所以设计如下:

             |<--|        |<--------------|                   |<--|
user (id: x) |   | webapp |   | queue |   | worker (thread x) |   | DB
             |-->|        |-->|       |-->|                   |-->|

您还可以为每个用户创建一个队列,直到他们有一个活动会话,因此您可以为每个会话运行一个单独的后台进程。

【讨论】:

    【解决方案3】:

    我不明白为什么你需要一个缓存连接,为什么不在每个请求都重新连接缓存用户凭据的地方,但无论如何我会尝试概述一个可能符合你要求的解决方案。

    我建议先研究一个更通用的任务 - 在您的应用需要处理的后续请求之间缓存一些内容,并且无法序列化到 django 的会话中。 在您的特定情况下,此共享值将是一个 数据库连接(或多个连接)。 让我们从一个简单的任务开始,即在请求之间共享一个简单的计数器变量,以了解幕后实际发生的情况。

    令人惊讶的是,两个答案都没有提到您可能使用的网络服务器! 实际上有多种方法可以在 Web 应用程序中处理并发连接:

    1. 拥有多个进程,每个请求都会随机进入其中一个
    2. 拥有多个线程,每个请求都由一个随机线程处理
    3. p.1 和 p.2 合并
    4. 各种 异步 技术,当有一个单个 进程 + 事件循环 处理请求时需要注意该请求处理程序不应长时间阻塞

    根据我自己的经验,p.1-2 适用于大多数典型的 web 应用程序。 Apache1.x 只能处理 p.1,Apache2.x 可以处理所有 1-3。

    让我们从以下django 应用程序开始,并运行一个单进程gunicorn 网络服务器。 我将使用gunicorn,因为它不像apache 那样容易配置(个人意见:-)

    views.py

    import time
    
    from django.http import HttpResponse
    
    c = 0
    
    def main(self):
        global c
        c += 1
        return HttpResponse('val: {}\n'.format(c))
    
    
    def heavy(self):
        time.sleep(10)
        return HttpResponse('heavy done')
    

    urls.py

    from django.contrib import admin
    from django.urls import path
    
    from . import views
    
    urlpatterns = [
        path('admin/', admin.site.urls),
        path('', views.main, name='main'),
        path('heavy/', views.heavy, name='heavy')
    ]
    

    以单进程模式运行:

    gunicorn testpool.wsgi -w 1
    

    这是我们的进程树 - 只有 1 个工作人员可以处理所有请求

    pstree 77292
    -+= 77292 oleg /Users/oleg/.virtualenvs/test3.4/bin/python /Users/oleg/.virtualenvs/test3.4/bin/gunicorn testpool.wsgi -w 1
     \--- 77295 oleg /Users/oleg/.virtualenvs/test3.4/bin/python /Users/oleg/.virtualenvs/test3.4/bin/gunicorn testpool.wsgi -w 1
    

    尝试使用我们的应用:

    curl 'http://127.0.0.1:8000'
    val: 1
    
    curl 'http://127.0.0.1:8000'
    val: 2
    
    curl 'http://127.0.0.1:8000'
    val: 3
    

    如您所见,您可以轻松地在后续请求之间共享计数器。 这里的问题是您只能并行处理一个请求。如果您在一个标签中请求 /heavy/,则 / 将在 /heavy 完成之前起作用

    现在让我们使用 2 个工作进程:

    gunicorn testpool.wsgi -w 2
    

    这是进程树的样子:

     pstree 77285
    -+= 77285 oleg /Users/oleg/.virtualenvs/test3.4/bin/python /Users/oleg/.virtualenvs/test3.4/bin/gunicorn testpool.wsgi -w 2
     |--- 77288 oleg /Users/oleg/.virtualenvs/test3.4/bin/python /Users/oleg/.virtualenvs/test3.4/bin/gunicorn testpool.wsgi -w 2
     \--- 77289 oleg /Users/oleg/.virtualenvs/test3.4/bin/python /Users/oleg/.virtualenvs/test3.4/bin/gunicorn testpool.wsgi -w 2
    

    测试我们的应用:

    curl 'http://127.0.0.1:8000'
    val: 1
    
    curl 'http://127.0.0.1:8000'
    val: 2
    
    curl 'http://127.0.0.1:8000'
    val: 1
    

    前两个请求已由第一个 worker process 处理,第三个 - 由第二个拥有自己内存空间的工作进程处理,因此您会看到 1 而不是 3。 请注意,您的输出可能会有所不同,因为进程 1 和 2 是随机选择的。但迟早你会遇到一个不同的流程。

    这对我们来说不是很有帮助,因为我们需要处理多个并发请求,并且我们需要以某种方式让我们的请求由通常无法完成的特定进程处理。

    如果您的请求由不同的进程处理 - 需要建立新的连接。

    让我们转到线程

    gunicorn testpool.wsgi -w 1 --threads 2
    

    再次 - 只有 1 个进程

    pstree 77310
    -+= 77310 oleg /Users/oleg/.virtualenvs/test3.4/bin/python /Users/oleg/.virtualenvs/test3.4/bin/gunicorn testpool.wsgi -w 1 --threads 2
     \--- 77313 oleg /Users/oleg/.virtualenvs/test3.4/bin/python /Users/oleg/.virtualenvs/test3.4/bin/gunicorn testpool.wsgi -w 1 --threads 2
    

    现在,如果您在一个标签中运行 /heavy,您仍然可以查询 /,并且您的计数器将在请求之间保留 ! 即使线程数量根据您的工作负载增加或减少,它仍然可以正常工作。

    问题:您需要使用 python 线程同步技术 (read more) 像这样同步访问共享变量。 另一个问题是同一用户可能需要并行发出多个查询 - 即打开多个选项卡。

    要处理它,当您有可用的数据库凭据时,您可以在第一次请求时打开 多个 连接。

    如果用户需要的连接多于您的应用可能会等待锁定,直到连接可用。

    回到你的问题

    您可以创建一个具有以下方法的类:

    from contextlib import contextmanager
    
    class ConnectionPool(object):
    
       def __init__(self, max_connections=4):
          self._pool = dict()
          self._max_connections = max_connections
    
       def preconnect(self, session_id, user, password):
           # create multiple connections and put them into self._pool
           # ...
    
        @contextmanager
        def get_connection(sef, session_id):
           # if have an available connection:
                # mark it as allocated
                # and return it
                try:
                    yield connection
               finally:
                  # put it back to the pool
                  # ....
           # else
            # wait until there's a connection returned to the pool by another thread
    
    pool = ConnectionPool(4)
    
    def some_view(self):
         session_id = ...
         with pool.get_connection(session_id) as conn:
            conn.query(...)
    

    这不是一个完整的解决方案 - 您需要以某种方式删除长时间未使用的过时连接。

    如果用户在很长一段时间后回来并且他的连接已关闭,他将需要再次提供他的凭据 - 希望从您的应用的角度来看没问题。

    另外请记住 python threads 有其性能损失,不确定这是否对您来说是个问题。

    我还没有检查过apache2(配置负担太大,我已经很久没有使用它并且通常使用uwsgi),但它也应该在那里工作 - 很高兴收到你的回复 如果你设法运行它)

    也不要忘记 p.4(异步方法) - 您不太可能在 apache 上使用它,但值得研究 - 关键字:django + gevent,django + asyncio。它有其优点/缺点,并且可能会极大地影响您的应用实施,因此如果不详细了解您的应用需求,就很难提出任何解决方案

    【讨论】:

      【解决方案4】:

      我只是在这里分享我的知识。

      安装 PyMySQL 以使用 MySql

      对于 Python 2.x

      pip install PyMySQL
      

      对于 Python 3.x

      pip3 install PyMySQL
      

      1.如果您愿意使用 Django 框架,那么无需重新连接即可轻松运行 SQL 查询。

      在 setting.py 文件中添加以下行

      DATABASES = {
              'default': {
                  'ENGINE': 'django.db.backends.mysql',
                  'NAME': 'test',
                  'USER': 'test',
                  'PASSWORD': 'test',
                  'HOST': 'localhost',
                  'OPTIONS': {'charset': 'utf8mb4'},
              }
          }
      

      在views.py 文件中添加这些行来获取数据。您可以根据需要自定义查询

      from django.db import connection
      def connect(request):
          cursor = connection.cursor()
          cursor.execute("SELECT * FROM Tablename");
          results = cursor.fetchall()
          return results 
      

      你会得到想要的结果。

      点击here了解更多信息

      2。对于 python Tkinter

      from Tkinter import *
      import MySQLdb
      
      db = MySQLdb.connect("localhost","root","root","test")
      # prepare a cursor object using cursor() method
      cursor = db.cursor()
      cursor.execute("SELECT * FROM Tablename")
      if cursor.fetchone() is not None:
          print("In If")
      else:
          print("In Else")
      cursor.close()
      

      更多信息请参考this

      PS:您可以查看此链接以了解您的问题,以便以后重用数据库连接。

      How to enable MySQL client auto re-connect with MySQLdb?

      【讨论】:

        【解决方案5】:

        我不是该领域的专家,但我相信 PgBouncer 会为您完成这项工作,前提是您能够使用 PostgreSQL 后端(这是您没有明确说明的一个细节)。 PgBouncer 是一个连接池,它允许你重复使用连接,避免每次请求连接的开销。

        根据他们的documentation

        用户、密码

        如果设置了 user=,所有到目标数据库的连接都将使用指定的用户完成,这意味着该数据库将只有一个池。

        否则 PgBouncer 会尝试使用客户端用户名登录目标数据库,这意味着每个用户将有一个池。

        因此,您可以为每个用户拥有一个连接池,这听起来就像您想要的那样。

        在 MySQL 领域,mysql.connector.pooling 模块允许您进行一些连接池,但我不确定您是否可以进行每用户池。鉴于您可以设置池名称,我猜您可以使用用户名来标识池。

        无论您使用什么,您都可能遇到无法避免重新连接的情况(用户连接、做一些事情、去开会和吃午饭、回来并想采取更多行动)。

        【讨论】:

          【解决方案6】:

          您可以使用 IoC 容器为您存储单例提供程序。本质上,它不会每次都构建一个新的连接,它只会构建一次(第一次调用ConnectionContainer.connection_provider()),之后它总是会返回之前构建的连接。

          您需要 dependency-injector 包才能使我的示例工作:

          import dependency_injector.containers as containers
          import dependency_injector.providers as providers
          
          
          class ConnectionProvider():
              def __init__(self, host, user, passwd, db, charset):
                  self.conn = MySQLdb.connect(
                      host=host,
                      user=user,
                      passwd=passwd,
                      db=db,
                      charset=charset
                  )
          
          
          class ConnectionContainer(containers.DeclarativeContainer):
              connection_provider = providers.Singleton(ConnectionProvider,
                                                        host='aaa',
                                                        user='bbb',
                                                        passwd='ccc',
                                                        db='ddd',
                                                        charset='utf8')
          
          
          def do_queries(request, sql):
              user = request.user
              conn = ConnectionContainer.connection_provider().conn
              cursor = conn.cursor()
              cursor.execute(sql)
          

          我在这里对连接字符串进行了硬编码,但也可以根据可更改的配置使其可变。在这种情况下,您还可以为配置文件创建一个容器,并让连接容器从那里读取其配置。然后在运行时设置配置。如下:

          import dependency_injector.containers as containers
          import dependency_injector.providers as providers
          
          class ConnectionProvider():
              def __init__(self, connection_config):
                  self.conn = MySQLdb.connect(**connection_config)
          
          class ConfigContainer(containers.DeclarativeContainer):
              connection_config = providers.Configuration("connection_config")
          
          class ConnectionContainer(containers.DeclarativeContainer):
              connection_provider = providers.Singleton(ConnectionProvider, ConfigContainer.connection_config)
          
          def do_queries(request, sql):
              user = request.user
              conn = ConnectionContainer.connection_provider().conn
              cursor = conn.cursor()
              cursor.execute(sql)
          
          
          # run code
          my_config = {
              'host':'aaa',
              'user':'bbb',
              'passwd':'ccc',
              'db':'ddd',
              'charset':'utf8'
          }
          
          ConfigContainer.connection_config.override(my_config)
          request = ...
          sql = ...
          
          do_queries(request, sql)
          

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2013-05-25
            • 2019-01-19
            • 2023-01-28
            相关资源
            最近更新 更多