【问题标题】:multithreading to load data into sqlite db多线程将数据加载到 sqlite db
【发布时间】:2021-05-04 23:59:21
【问题描述】:

我正在从 API 下载数据并将其存储在 SQLite 数据库中。我想使用“多线程”来实现这个过程。有人可以帮助我如何实现它。

我找到了一个库,但出现错误。下面是代码。

import sqlite3
import os

import pandas as pd
from sodapy import Socrata

import concurrent.futures

dbPath = 'folder where db exists'
dbName = 'db file name'

## Setup connection & cursor with the DB
dbConn = sqlite3.connect(os.path.join(dbPath, dbName), check_same_thread=False)

## Setup the API and bring in the data
client = Socrata("health.data.ny.gov", None)

## Define all the countys to be used in threading
countys = [all 62 countys in New York]

varDict = dict.fromkeys(countys, {})
strDataList = ['test_date', 'LoadDate']
intDataList = ['new_positives', 'cumulative_number_of_positives', 'total_number_of_tests', 'cumulative_number_of_tests']


def getData(county):
    
    ## Check if table exists
    print("Processing ", county)
    varDict[county]['dbCurs'] = dbConn.cursor()
    varDict[county]['select'] = varDict[county]['dbCurs'].execute('SELECT name FROM sqlite_master WHERE type="table" AND name=?', (county,) )
    if not len(varDict[county]['select'].fetchall()):
        createTable(county)
    
    whereClause = 'county="'+county+'"'
    varDict[county]['results'] = client.get("xdss-u53e", where=whereClause)
    varDict[county]['data'] = pd.DataFrame.from_records(varDict[county]['results'])
    varDict[county]['data'].drop(['county'], axis=1, inplace=True)
    varDict[county]['data']['LoadDate'] = pd.to_datetime('now')
    varDict[county]['data'][strDataList] = varDict[county]['data'][strDataList].astype(str)
    varDict[county]['data']['test_date'] = varDict[county]['data']['test_date'].apply(lambda x: x[:10])
    varDict[county]['data'][intDataList] = varDict[county]['data'][intDataList].astype(int)
    varDict[county]['data'] = varDict[county]['data'].values.tolist()

    ## Insert values into SQLite
    varDict[county]['sqlQuery'] = 'INSERT INTO ['+county+'] VALUES (?,?,?,?,?,?)'
    varDict[county]['dbCurs'].executemany(varDict[county]['sqlQuery'], varDict[county]['data'])
    dbConn.commit()
    
# for i in dbCurs.execute('SELECT * FROM albany'):
#     print(i)

def createTable(county):
    
    sqlQuery = 'CREATE TABLE ['+county+'] ( [Test Date] TEXT, [New Positives] INTEGER NOT NULL, [Cumulative Number of Positives] INTEGER NOT NULL, [Total Number of Tests Performed] INTEGER NOT NULL, [Cumulative Number of Tests Performed] INTEGER NOT NULL, [Load date] TEXT NOT NULL, PRIMARY KEY([Test Date]))'
    varDict[county]['dbCurs'].execute(sqlQuery)
    

# for _ in countys:
#     getData(_)
    
# x = countys[:5]

with concurrent.futures.ThreadPoolExecutor() as executor:
    # results = [executor.submit(getData, y) for y in x]
    executor.map(getData, countys)

getData 是将数据县明智地引入并加载到数据库中的函数。 Countys 是所有县的列表。我能够同步完成,但想实现多线程。 同步执行它的 for 循环(有效)是

for _ in countys:
    getData(_)

错误信息是

ProgrammingError: SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 8016 and this is thread id 19844.

【问题讨论】:

  • 您可以使用多个线程获取数据,但一次只能有一个线程写入 SQLite 数据库。
  • 知道如何更改代码,以便在旧线程正常工作时新线程不会干扰。
  • sqlQuery = 'INSERT INTO ['+county+'] VALUES (?,?,?,?,?,?)' dbCurs.executemany(sqlQuery, data) dbConn.commit() 这是插入代码

标签: python multithreading sqlite


【解决方案1】:

您可能会发现this 很有用

sqlite.connect(":memory:", check_same_thread=False)

【讨论】:

  • 我不再收到错误但我的代码挂起
  • 也许你有一个不相关的错误。你能给出相关的代码行吗?
  • 我无法确定是哪一行实际导致了问题。我附上了完整的代码供您参考。
  • 这到底是做什么的?
  • 我添加了 cmets 和概述。你能告诉我哪一部分让你感到困惑。
猜你喜欢
  • 2016-01-06
  • 2016-08-24
  • 2015-11-01
  • 1970-01-01
  • 2015-11-27
  • 1970-01-01
  • 2012-08-14
  • 2011-01-23
  • 1970-01-01
相关资源
最近更新 更多