【问题标题】:Python, SQLite and threadingPython、SQLite 和线程
【发布时间】:2010-10-06 04:54:33
【问题描述】:

我正在开发一个应用程序,它将通过 HTTP 从多个位置收集数据,在本地缓存数据,然后通过 HTTP 提供服务。

所以我正在查看以下内容。我的应用程序将首先创建几个线程,这些线程将以指定的时间间隔收集数据并将该数据本地缓存到 SQLite 数据库中。

然后在主线程中启动一个 CherryPy 应用程序,该应用程序将查询该 SQLite 数据库并提供数据。

我的问题是:如何处理从我的线程和 CherryPy 应用程序到 SQLite 数据库的连接?

如果我将每个线程连接到数据库,我是否也能够创建/使用内存数据库?

【问题讨论】:

    标签: python multithreading sqlite


    【解决方案1】:

    简答:不要在线程应用程序中使用 Sqlite3。

    Sqlite3 数据库在规模上可以很好地扩展,但在并发方面却非常糟糕。您将受到“数据库已锁定”错误的困扰。

    如果这样做,您将需要每个线程都有一个连接,并且您必须确保这些连接自行清理。这传统上使用线程本地会话处理,并且使用 SQLAlchemy 的 ScopedSession 执行得相当好(例如)。如果我是你,我会使用它,即使你没有使用 SQLAlchemy ORM 功能。

    【讨论】:

    • 同意。对 Sqlite3 数据库的多个并发写入只是自找麻烦。不过并发读取没问题...
    • 并发写入不同表怎么样,我对 SQLite 有点陌生,但来自 postgres 背景,它在那里有 MVCC,我理解 SQLite 是在本地使用,没有客户端服务器连接的概念,但是如果我有 10 个表并且我想并行写入它们,我可以这样做吗?
    【解决方案2】:

    正在进行此测试以确定从 SQLite 数据库写入和读取的最佳方式。我们遵循以下 3 种方法

    1. 不带任何线程的读写(上面有normal的方法)
    2. 使用线程读写
    3. 通过进程读写

    我们的示例数据集是一个虚拟生成的 OHLC 数据集,其中包含一个符号、时间戳以及 ohlc 和 volumefrom、volumeto 的 6 个假值

    读取

    1. 普通方法读取大约需要 0.25 秒
    2. 线程化方法需要 10 秒
    3. 读取处理需要 0.25 秒

    获胜者:处理和正常

    1. 普通方法大约需要 1.5 秒来写
    2. 线程方法大约需要 30 秒
    3. 处理大约需要 30 秒

    获胜者:普通

    注意:并非所有记录都使用线程和处理写入方法写入。当写入排队时,线程化和处理过的写入方法显然会遇到数据库锁定错误 SQlite 仅将写入排队到某个阈值,然后抛出 sqlite3.OperationalError 指示数据库已锁定。 理想的方法是再次重试插入相同的块,但没有意义,因为并行插入的方法执行比顺序读取需要更多的时间,即使没有重试 锁定/失败的插入 在不重试的情况下,97% 的行已被写入,并且仍然比顺序写入多花费 10 倍的时间

    外卖策略:

    1. 更喜欢读取 SQLite 并将其写入同一线程

    2. 如果你必须做多线程,使用多处理读取性能或多或少相同的,并遵循单线程写入操作

    3. 请勿使用线程进行读取和写入,因为两者的速度都慢了 10 倍,您可以为此感谢 GIL

    这是完整测试的代码

    import sqlite3
    import time
    import random
    import string
    import os
    import timeit
    from functools import wraps
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    import threading
    import os
    
    database_file = os.path.realpath('../files/ohlc.db')
    
    create_statement = 'CREATE TABLE IF NOT EXISTS database_threading_test (symbol TEXT, ts INTEGER, o REAL, h REAL, l REAL, c REAL, vf REAL, vt REAL, PRIMARY KEY(symbol, ts))'
    insert_statement = 'INSERT INTO database_threading_test VALUES(?,?,?,?,?,?,?,?)'
    select = 'SELECT * from database_threading_test'
    
    def time_stuff(some_function):
        def wrapper(*args, **kwargs):
            t0 = timeit.default_timer()
            value = some_function(*args, **kwargs)
            print(timeit.default_timer() - t0, 'seconds')
            return value
        return wrapper
    
    def generate_values(count=100):
        end = int(time.time()) - int(time.time()) % 900
        symbol = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(10))
        ts = list(range(end - count * 900, end, 900))
        for i in range(count):
            yield (symbol, ts[i], random.random() * 1000, random.random() * 1000, random.random() * 1000, random.random() * 1000, random.random() * 1e9, random.random() * 1e5)
    
    def generate_values_list(symbols=1000,count=100):
        values = []
        for _ in range(symbols):
            values.extend(generate_values(count))
        return values
    
    @time_stuff
    def sqlite_normal_read():
        """
    
        100k records in the database, 1000 symbols, 100 rows
        First run
        0.25139795300037804 seconds
        Second run
    
        Third run
        """
        conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
        try:
            with conn:
                conn.execute(create_statement)
                results = conn.execute(select).fetchall()
                print(len(results))
        except sqlite3.OperationalError as e:
            print(e)
    
    @time_stuff
    def sqlite_normal_write():
        """
        1000 symbols, 100 rows
        First run
        2.279409104000024 seconds
        Second run
        2.3364172020001206 seconds
        Third run
        """
        l = generate_values_list()
        conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
        try:
            with conn:
                conn.execute(create_statement)
                conn.executemany(insert_statement, l)
    
        except sqlite3.OperationalError as e:
            print(e)
    
    @time_stuff
    def sequential_batch_read():
        """
        We read all the rows for each symbol one after the other in sequence
        First run
        3.661222331999852 seconds
        Second run
        2.2836898810001003 seconds
        Third run
        0.24514851899994028 seconds
        Fourth run
        0.24082150699996419 seconds
        """
        conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
        try:
            with conn:
                conn.execute(create_statement)
                symbols = conn.execute("SELECT DISTINCT symbol FROM database_threading_test").fetchall()
                for symbol in symbols:
                    results = conn.execute("SELECT * FROM database_threading_test WHERE symbol=?", symbol).fetchall()
        except sqlite3.OperationalError as e:
            print(e)  
    
    
    
    def sqlite_threaded_read_task(symbol):
        results = []
        conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
        try:
            with conn:
                results = conn.execute("SELECT * FROM database_threading_test WHERE symbol=?", symbol).fetchall()
        except sqlite3.OperationalError as e:
            print(e)
        finally:
            return results
    
    def sqlite_multiprocessed_read_task(symbol):
        results = []
        conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
        try:
            with conn:
                results = conn.execute("SELECT * FROM database_threading_test WHERE symbol=?", symbol).fetchall()
        except sqlite3.OperationalError as e:
            print(e)
        finally:
            return results
    
    @time_stuff
    def sqlite_threaded_read():
        """
        1000 symbols, 100 rows per symbol
        First run
        9.429676861000189 seconds
        Second run
        10.18928106400017 seconds
        Third run
        10.382290903000467 seconds
        """
        conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
        symbols = conn.execute("SELECT DISTINCT SYMBOL from database_threading_test").fetchall()
        with ThreadPoolExecutor(max_workers=8) as e:
            results = e.map(sqlite_threaded_read_task, symbols, chunksize=50)
            for result in results:
                pass
    
    @time_stuff
    def sqlite_multiprocessed_read():
        """
        1000 symbols, 100 rows
        First run
        0.2484774920012569 seconds!!!
        Second run
        0.24322178500005975 seconds
        Third run
        0.2863524549993599 seconds
        """
        conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
        symbols = conn.execute("SELECT DISTINCT SYMBOL from database_threading_test").fetchall()
        with ProcessPoolExecutor(max_workers=8) as e:
            results = e.map(sqlite_multiprocessed_read_task, symbols, chunksize=50)
            for result in results:
                pass
    
    def sqlite_threaded_write_task(n):
        """
        We ignore the database locked errors here. Ideal case would be to retry but there is no point writing code for that if it takes longer than a sequential write even without database locke errors
        """
        conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
        data = list(generate_values())
        try:
            with conn:
                conn.executemany("INSERT INTO database_threading_test VALUES(?,?,?,?,?,?,?,?)",data)
        except sqlite3.OperationalError as e:
            print("Database locked",e)
        finally:
            conn.close()
            return len(data)
    
    def sqlite_multiprocessed_write_task(n):
        """
        We ignore the database locked errors here. Ideal case would be to retry but there is no point writing code for that if it takes longer than a sequential write even without database locke errors
        """
        conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
        data = list(generate_values())
        try:
            with conn:
                conn.executemany("INSERT INTO database_threading_test VALUES(?,?,?,?,?,?,?,?)",data)
        except sqlite3.OperationalError as e:
            print("Database locked",e)
        finally:
            conn.close()
            return len(data)
    
    @time_stuff
    def sqlite_threaded_write():
        """
    
        Did not write all the results but the outcome with 97400 rows written is still this...
        Takes 20x the amount of time as a normal write
        1000 symbols, 100 rows
        First run
        28.17819765000013 seconds
        Second run
        25.557972323000058 seconds
        Third run
        """
        symbols = [i for i in range(1000)]
        with ThreadPoolExecutor(max_workers=8) as e:
            results = e.map(sqlite_threaded_write_task, symbols, chunksize=50)
            for result in results:
                pass
    
    @time_stuff
    def sqlite_multiprocessed_write():
        """
        1000 symbols, 100 rows
        First run
        30.09209805699993 seconds
        Second run
        27.502465319000066 seconds
        Third run
        """
        symbols = [i for i in range(1000)]
        with ProcessPoolExecutor(max_workers=8) as e:
            results = e.map(sqlite_multiprocessed_write_task, symbols, chunksize=50)
            for result in results:
                pass
    
    
    sqlite_normal_write()
    

    【讨论】:

      【解决方案3】:

      你可以使用that之类的东西。

      【讨论】:

        【解决方案4】:

        "...创建多个线程,以指定的时间间隔收集数据并将该数据本地缓存到 sqlite 数据库中。 然后在主线程中启动一个 CherryPy 应用程序,该应用程序将查询该 sqlite 数据库并提供数据。”

        不要在线程上浪费大量时间。您所描述的只是操作系统进程。只需启动普通进程来收集和运行 Cherry Py。

        为此,您没有真正使用单个进程中的并发线程。以指定的时间间隔收集数据——当使用简单的操作系统进程完成时——可以由操作系统非常简单地安排。例如,Cron 在这方面做得很好。

        CherryPy 应用程序也是一个操作系统进程,而不是某个更大进程的单个线程。

        只使用进程——线程不会帮助你。

        【讨论】:

          【解决方案5】:

          根据应用程序,数据库可能是一个真正的开销。如果我们谈论的是易失性数据,也许您可​​以完全跳过通过 DB 进行的通信,并通过 IPC 在数据收集过程数据服务过程之间共享数据.当然,如果数据必须被持久化,这不是一个选项。

          【讨论】:

            【解决方案6】:

            根据数据速率,sqlite 可能是执行此操作的正确方法。每次写入都会锁定整个数据库,因此您不会扩展到每秒 1000 次同时写入。但是,如果您只有几个,这是确保您不会互相覆盖的最安全方法。

            【讨论】:

              猜你喜欢
              • 1970-01-01
              • 2013-12-06
              • 2023-03-31
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              相关资源
              最近更新 更多