【发布时间】:2021-05-04 23:59:21
【问题描述】:
我正在从 API 下载数据并将其存储在 SQLite 数据库中。我想使用“多线程”来实现这个过程。有人可以帮助我如何实现它。
我找到了一个库,但出现错误。下面是代码。
import sqlite3
import os
import pandas as pd
from sodapy import Socrata
import concurrent.futures
dbPath = 'folder where db exists'
dbName = 'db file name'
## Setup connection & cursor with the DB
dbConn = sqlite3.connect(os.path.join(dbPath, dbName), check_same_thread=False)
## Setup the API and bring in the data
client = Socrata("health.data.ny.gov", None)
## Define all the countys to be used in threading
countys = [all 62 countys in New York]
varDict = dict.fromkeys(countys, {})
strDataList = ['test_date', 'LoadDate']
intDataList = ['new_positives', 'cumulative_number_of_positives', 'total_number_of_tests', 'cumulative_number_of_tests']
def getData(county):
## Check if table exists
print("Processing ", county)
varDict[county]['dbCurs'] = dbConn.cursor()
varDict[county]['select'] = varDict[county]['dbCurs'].execute('SELECT name FROM sqlite_master WHERE type="table" AND name=?', (county,) )
if not len(varDict[county]['select'].fetchall()):
createTable(county)
whereClause = 'county="'+county+'"'
varDict[county]['results'] = client.get("xdss-u53e", where=whereClause)
varDict[county]['data'] = pd.DataFrame.from_records(varDict[county]['results'])
varDict[county]['data'].drop(['county'], axis=1, inplace=True)
varDict[county]['data']['LoadDate'] = pd.to_datetime('now')
varDict[county]['data'][strDataList] = varDict[county]['data'][strDataList].astype(str)
varDict[county]['data']['test_date'] = varDict[county]['data']['test_date'].apply(lambda x: x[:10])
varDict[county]['data'][intDataList] = varDict[county]['data'][intDataList].astype(int)
varDict[county]['data'] = varDict[county]['data'].values.tolist()
## Insert values into SQLite
varDict[county]['sqlQuery'] = 'INSERT INTO ['+county+'] VALUES (?,?,?,?,?,?)'
varDict[county]['dbCurs'].executemany(varDict[county]['sqlQuery'], varDict[county]['data'])
dbConn.commit()
# for i in dbCurs.execute('SELECT * FROM albany'):
# print(i)
def createTable(county):
sqlQuery = 'CREATE TABLE ['+county+'] ( [Test Date] TEXT, [New Positives] INTEGER NOT NULL, [Cumulative Number of Positives] INTEGER NOT NULL, [Total Number of Tests Performed] INTEGER NOT NULL, [Cumulative Number of Tests Performed] INTEGER NOT NULL, [Load date] TEXT NOT NULL, PRIMARY KEY([Test Date]))'
varDict[county]['dbCurs'].execute(sqlQuery)
# for _ in countys:
# getData(_)
# x = countys[:5]
with concurrent.futures.ThreadPoolExecutor() as executor:
# results = [executor.submit(getData, y) for y in x]
executor.map(getData, countys)
getData 是将数据县明智地引入并加载到数据库中的函数。 Countys 是所有县的列表。我能够同步完成,但想实现多线程。 同步执行它的 for 循环(有效)是
for _ in countys:
getData(_)
错误信息是
ProgrammingError: SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 8016 and this is thread id 19844.
【问题讨论】:
-
您可以使用多个线程获取数据,但一次只能有一个线程写入 SQLite 数据库。
-
知道如何更改代码,以便在旧线程正常工作时新线程不会干扰。
-
sqlQuery = 'INSERT INTO ['+county+'] VALUES (?,?,?,?,?,?)' dbCurs.executemany(sqlQuery, data) dbConn.commit() 这是插入代码
标签: python multithreading sqlite