【问题标题】:Pandas errors in writing chunks to database with df.to_sql()Pandas 使用 df.to_sql() 将块写入数据库时​​出错
【发布时间】:2018-02-28 22:32:36
【问题描述】:

现有数据库和预期结果:

我有一个更大的 SQLite 数据库(12gb,超过 4400 万行的表),我想在 Python3 中使用 Pandas 对其进行修改。

示例目标:我希望将这些大表之一(4400 万行)分块读入 DF,操作 DF 块,并将结果写入新表。如果可能的话,如果新表存在,我想替换它,并将每个块附加到它。

因为我的操作只添加或修改列,新表的行数应该与原始表相同。

问题:

主要问题似乎源于以下代码中的以下行:

df.to_sql(new_table, con=db, if_exists = "append", index=False)

  1. 当在下面的代码中运行这一行时,我似乎一直得到一个额外的 size=N 块,加上一个比我预期的观察结果。
  2. 此代码第一次使用新表名运行时出现错误:
 Traceback (most recent call last):
  File "example.py", line 23, in <module>
    for df in df_generator:
  File "/usr/local/lib/python3.5/site-packages/pandas/io/sql.py", line 1420, in _query_iterator
    data = cursor.fetchmany(chunksize)
sqlite3.OperationalError: SQL logic error or missing database
  1. 如果我随后使用相同的新表名重新运行脚本,它将针对每个块运行,并为额外的块运行 +1 行。

  2. df.to_sql() 行被注释掉时,循环运行预期的块数。

完整代码的问题测试示例:

完整代码:example.py

import pandas as pd
import sqlite3

#Helper Functions Used in Example
def ren(invar, outvar, df):
    df.rename(columns={invar:outvar}, inplace=True)
    return(df)

def count_result(c, table):
    ([print("[*] total: {:,} rows in {} table"
        .format(r[0], table)) 
        for r in c.execute("SELECT COUNT(*) FROM {};".format(table))])


#Connect to Data
db = sqlite3.connect("test.db")
c = db.cursor()
new_table = "new_table"

#Load Data in Chunks
df_generator = pd.read_sql_query("select * from test_table limit 10000;", con=db, chunksize = 5000)

for df in df_generator:
    #Functions to modify data, example
    df = ren("name", "renamed_name", df)
    print(df.shape)
    df.to_sql(new_table, con=db, if_exists = "append", index=False)


#Count if new table is created
try:
    count_result(c, new_table)
except:
    pass

1.结果当 #df.to_sql(new_table, con=db, if_exists = "append", index=False)

(问题行已注释掉):

$ python3 example.py 
(5000, 22)
(5000, 22)

这是我所期望的,因为示例代码将我的大表限制为 10k 行。

2。结果当 df.to_sql(new_table, con=db, if_exists = "append", index=False)

一个。问题行没有被注释掉

b.这是第一次使用 new_table 运行代码:

$ python3 example.py 
(5000, 22)
Traceback (most recent call last):
  File "example.py", line 23, in <module>
    for df in df_generator:
  File "/usr/local/lib/python3.5/site-packages/pandas/io/sql.py", line 1420, in _query_iterator
    data = cursor.fetchmany(chunksize)
sqlite3.OperationalError: SQL logic error or missing database

3.结果当 df.to_sql(new_table, con=db, if_exists = "append", index=False)

一个。问题行没有被注释掉

b.上面的代码使用 new_table 运行 第二次

$ python3 example.py 
(5000, 22)
(5000, 22)
(5000, 22)
(1, 22)
[*] total: 20,001 rows in new_table table

因此,我首先遇到的问题是第一次运行时代码中断(结果 2),其次,第二次运行时的总行数(结果 3)是我预期的两倍以上。

任何关于如何解决此问题的建议将不胜感激。

【问题讨论】:

  • 您只是想重命名 SQLite 表中的列吗?
  • @MaxU 不,重命名功能只是一个示例修改。我想跨多个列执行一些复杂的操作,这些操作在 Pandas 中比在 SQL 中更容易。
  • 试试这个:db = sqlite3.connect("test.db", isolation_level=None)
  • @MaxU,太好了,第一次工作并产生了预期的结果!我确实注意到,虽然初始(失败的代码)几乎会在第二次尝试时立即运行,但使用您的解决方案,代码几乎就像每个块都有一个 time.sleep(8) 一样存在滞后。有没有办法加快速度,或者我应该在我机器上的内存允许的情况下增加块大小?

标签: python pandas sqlite pandas-to-sql


【解决方案1】:

您可以尝试指定:

db = sqlite3.connect("test.db", isolation_level=None)
#  ---->                        ^^^^^^^^^^^^^^^^^^^^

除此之外,您可能会尝试增加您的块大小,因为否则提交之间的时间会缩短 SQLite DB - 我猜这会导致此错误...我还建议使用 PostgreSQL、MySQL/MariaDB 或类似的东西——它们更可靠,更适合这种数据库大小......

【讨论】:

    【解决方案2】:

    上述解决方案的时间延迟

    @MaxU 将isolation_level=None 添加到数据库连接的解决方案简短而实用。然而,无论出于何种原因,它都会显着减慢将每个块写入/提交到数据库的速度。例如,当我在 1200 万行的表上测试解决方案时,代码需要 6 个多小时才能完成。相反,从多个文本文件构建原始表格需要几分钟时间。

    这种见解导致了一个更快但不太优雅的解决方案,在 1200 万行的表上完成不到 7 分钟,而不是超过 6 小时。输出行与输入行匹配,解决了我原来问题中的问题。

    更快但不太优雅的解决方案

    由于从文本文件/csv 文件构建原始表并使用 SQL 脚本加载数据,我将这种方法与 Panda 的块功能相结合。基本的基本步骤如下:

    1. 连接到数据库
    2. 使用 SQL 脚本创建新表(列和顺序应与您对 pandas df 执行的任何操作相匹配)
    3. 分块读取海量表格
    4. 对于每个块,根据需要修改 df,写入 csv,使用 sql 加载 csv,然后提交更改。

    解决方案主要代码:

    import pandas as pd
    import sqlite3
    
    #Note I Used Functions I Wrote in build_db.py
    #(shown below after example solution)
    from build_db import *
    
    
    #Helper Functions Used in Example
    def lower_var(var, df):
        s = df[var].str.lower()
        df = df.drop(var, axis=1)
        df = pd.concat([df, s], axis=1)
        return(df)
    
    
    #Connect to Data
    db = sqlite3.connect("test.db")
    c = db.cursor()
    
    #create statement
    create_table(c, "create_test.sql", path='sql_clean/')
    
    #Load Data in Chunks
    df_generator = pd.read_sql_query("select * from example_table;", con=db, chunksize = 100000)
    
    for df in df_generator:
        #functions to modify data, example
        df = lower_var("name", df) #changes column order
    
        #restore df to column order in sql table
        db_order = ["cmte_id", "amndt_ind", "rpt_tp", "transaction_pgi", "image_num", "transaction_tp", \
            "entity_tp", "name", "city", "state", "zip_code", "employer", "occupation", "transaction_dt", \
            "transaction_amt", "other_id", "tran_id", "file_num", "memo_cd", "memo_text", "sub_id"]
        df = df[db_order]
    
        #write chunk to csv
        file = "df_chunk.csv"
        df.to_csv(file, sep='|', header=None, index=False)
    
        #insert chunk csv to db
        insert_file_into_table(c, "insert_test.sql", file, '|', path='sql_clean/')
        db.commit()
    
    
    #Count results
    count_result(c, "test_indiv")
    

    以上代码的导入用户函数

    #Relavant Functions in build_db.py
    
    def count_result(c, table):
        ([print("[*] total: {:,} rows in {} table"
            .format(r[0], table)) 
            for r in c.execute("SELECT COUNT(*) FROM {};".format(table))])
    
    def create_table(cursor, sql_script, path='sql/'):
        print("[*] create table with {}{}".format(path, sql_script))
        qry = open("{}{}".format(path, sql_script), 'rU').read()
        cursor.executescript(qry)
    
    
    def insert_file_into_table(cursor, sql_script, file, sep=',', path='sql/'):
        print("[*] inserting {} into table with {}{}".format(file, path, sql_script))
        qry = open("{}{}".format(path, sql_script), 'rU').read()
        fileObj = open(file, 'rU', encoding='latin-1')
        csvReader = csv.reader(fileObj, delimiter=sep, quotechar='"')
    
        try:
            for row in csvReader:
                try:
                    cursor.execute(qry, row)
                except sqlite3.IntegrityError as e:
                    pass
    
        except Exception as e:
            print("[*] error while processing file: {}, error code: {}".format(file, e))
            print("[*] sed replacing null bytes in file: {}".format(file))
            sed_replace_null(file, "clean_null.sh")
            subprocess.call("bash clean_null.sh", shell=True)
    
            try:
                print("[*] inserting {} into table with {}{}".format(file, path, sql_script))
                fileObj = open(file, 'rU', encoding='latin-1')
                csvReader = csv.reader(fileObj, delimiter=sep, quotechar='"')
                for row in csvReader:
                    try:
                        cursor.execute(qry, row)
                    except sqlite3.IntegrityError as e:
                        pass
                        print(e)    
    
            except Exception as e:
                print("[*] error while processing file: {}, error code: {}".format(file, e))
    

    SQL 用户脚本

    --create_test.sql
    
    DROP TABLE if exists test_indiv;
    
    CREATE TABLE test_indiv (
        cmte_id TEXT NOT NULL,
        amndt_ind TEXT,
        rpt_tp TEXT,
        transaction_pgi TEXT,
        image_num TEXT,
        transaction_tp TEXT,
        entity_tp TEXT,
        name TEXT,
        city TEXT,
        state TEXT,
        zip_code TEXT,
        employer TEXT,
        occupation TEXT,
        transaction_dt TEXT,
        transaction_amt TEXT,
        other_id TEXT,
        tran_id TEXT,
        file_num NUMERIC,
        memo_cd TEXT,
        memo_text TEXT,
        sub_id NUMERIC NOT NULL
    );
    
    CREATE UNIQUE INDEX idx_test_indiv ON test_indiv (sub_id);
    
    --insert_test.sql
    
    INSERT INTO test_indiv (
        cmte_id,
        amndt_ind,
        rpt_tp,
        transaction_pgi,
        image_num,
        transaction_tp,
        entity_tp,
        name,
        city,
        state,
        zip_code,
        employer,
        occupation,
        transaction_dt,
        transaction_amt,
        other_id,
        tran_id,
        file_num,
        memo_cd,
        memo_text,
        sub_id
        ) 
    VALUES (
        ?,
        ?,
        ?,
        ?,
        ?,
        ?,
        ?,
        ?,
        ?,
        ?,
        ?,
        ?,
        ?,
        ?,
        ?,
        ?,
        ?,
        ?,
        ?,
        ?,
        ?
    );
    

    【讨论】:

    • 对上面的一个警告是,分块额外插入的现象实际上并没有解决,但是最终表中的总行数是一样的,如果执行的create sql 脚本有一个创建唯一索引的语句。如果没有唯一索引,则代码有问题。
    【解决方案3】:

    遇到了完全相同的问题(处理 > 30 GB 的数据)。以下是我解决问题的方法: 而不是使用 read_sql 的 Chunk 特性。我决定像这样创建一个手动块循环器:

    chunksize=chunk_size
    offset=0
    for _ in range(0, a_big_number):
        query = "SELECT * FROM the_table %s offset %s" %(chunksize, offset)
        df = pd.read_sql(query, conn)
        if len(df)!=0:
            ....
        else:
            break
    

    【讨论】:

      猜你喜欢
      • 2017-01-17
      • 1970-01-01
      • 2017-04-01
      • 1970-01-01
      • 2017-05-17
      • 1970-01-01
      • 2021-12-09
      • 2014-01-21
      • 2014-11-21
      相关资源
      最近更新 更多