【问题标题】:Speed up insert to SQL Server from CSV file without using BULK INSERT or pandas to_sql在不使用 BULK INSERT 或 pandas to_sql 的情况下加速从 CSV 文件插入 SQL Server
【发布时间】:2017-10-11 09:15:27
【问题描述】:

我想将 Pandas 数据框作为一个整体放在 MS SQL Server 数据库的表中。像我这样的普通用户不允许使用 BULK INSERT。我正在使用 pyodbc 连接到我的数据库。我正在使用熊猫 0.13.1。我在某处读到,从 0.14 版开始,您可以使用 to_sql 方法,因此它对我的 pandas 数据框不可用。因此我使用了一个迭代器。我的数据框有 2 列:Col1 和 Col2。

我的代码正在运行,看起来像:

from pyodbc import connect
import pandasas pd

df = pd.read_csv('PathToMyCSVfile', sep=';', header=0)

cnxn = connect(DRIVER = '{SQL Server}', SERVER = 'MyServer', DATABASE = 'MyDatabase')
cursor = cnxn.cursor()

for index, row in df.interrows():
  cursor.execute("INSERT INTO MySchema.MyTable VALUES (?,?)", df['Col1'][index], def['Col2'][index]
  cnxn.commit()

如前所述,上面的代码可以运行,但是速度很慢... 我可以做些什么来加快速度?

【问题讨论】:

  • basic pyodbc bulk insert的可能重复
  • @IgnacioVergaraKausel 我不认为它是重复的。我的服务器上不允许 BULK INSERT。此外,可能的重复提到 executemany 是一个包装器,而不是一种不同的方法,因此我认为它不会(比我使用的执行快得多)。

标签: python sql-server pandas pyodbc


【解决方案1】:

您面临的瓶颈是您的代码为 DataFrame 中的每一行发送一个 INSERT 语句。即对于一个样本数据文件

id;txt
1;alpha
2;bravo
3;charlie
4;delta
5;echo
6;foxtrot
7;golf

您需要七 (7) 次往返服务器才能发送等价的

INSERT INTO MySchema.MyTable VALUES (1,'alpha')
INSERT INTO MySchema.MyTable VALUES (2,'bravo')
INSERT INTO MySchema.MyTable VALUES (3,'charlie')
...
INSERT INTO MySchema.MyTable VALUES (7,'golf')

您可以通过使用Table Value Constructor 在一次往返中执行相同的操作来显着加快速度:

INSERT INTO MySchema.MyTable VALUES (1,'alpha'),(2,'bravo'),(3,'charlie'), ... ,(7,'golf')

下面的代码就是这样做的。当我使用具有 5000 行的文件对其进行测试时,使用 rows_per_batch=1000(最大值)运行它比使用 rows_per_batch=1(相当于您当前的方法)快大约 100 倍。

import numpy
import pandas as pd
import pyodbc
import time


class MyDfInsert:
    def __init__(self, cnxn, sql_stub, data_frame, rows_per_batch=1000):
        # NB: hard limit is 1000 for SQL Server table value constructor
        self._rows_per_batch = 1000 if rows_per_batch > 1000 else rows_per_batch

        self._cnxn = cnxn
        self._sql_stub = sql_stub
        self._num_columns = None
        self._row_placeholders = None
        self._num_rows_previous = None
        self._all_placeholders = None
        self._sql = None

        row_count = 0
        param_list = list()
        for df_row in data_frame.itertuples():
            param_list.append(tuple(df_row[1:]))  # omit zero-based row index
            row_count += 1
            if row_count >= self._rows_per_batch:
                self._send_insert(param_list)  # send a full batch
                row_count = 0
                param_list = list()
        self._send_insert(param_list)  # send any remaining rows

    def _send_insert(self, param_list):
        if len(param_list) > 0:
            if self._num_columns is None:
                # print('[DEBUG] (building items that depend on the number of columns ...)')
                # this only happens once
                self._num_columns = len(param_list[0])
                self._row_placeholders = ','.join(['?' for x in range(self._num_columns)])
                # e.g. '?,?'
            num_rows = len(param_list)
            if num_rows != self._num_rows_previous:
                # print('[DEBUG] (building items that depend on the number of rows ...)')
                self._all_placeholders = '({})'.format('),('.join([self._row_placeholders for x in range(num_rows)]))
                # e.g. '(?,?),(?,?),(?,?)'
                self._sql = f'{self._sql_stub} VALUES {self._all_placeholders}'
                self._num_rows_previous = num_rows
            params = [int(element) if isinstance(element, numpy.int64) else element
                      for row_tup in param_list for element in row_tup]
            # print('[DEBUG]    sql: ' + repr(self._sql))
            # print('[DEBUG] params: ' + repr(params))
            crsr = self._cnxn.cursor()
            crsr.execute(self._sql, params)


if __name__ == '__main__':
    conn_str = (
        'DRIVER=ODBC Driver 11 for SQL Server;'
        'SERVER=192.168.1.134,49242;'
        'Trusted_Connection=yes;'
    )
    cnxn = pyodbc.connect(conn_str, autocommit=True)
    crsr = cnxn.cursor()
    crsr.execute("CREATE TABLE #tmp (id INT PRIMARY KEY, txt NVARCHAR(50))")

    df = pd.read_csv(r'C:\Users\Gord\Desktop\Query1.txt', sep=';', header=0)

    t0 = time.time()

    MyDfInsert(cnxn, "INSERT INTO #tmp (id, txt)", df, rows_per_batch=1000)

    print()
    print(f'Inserts completed in {time.time() - t0:.2f} seconds.')

    cnxn.close()

【讨论】:

  • Thanx @Gord Thompson,我复制了脚本并调整了驱动程序、服务器和 csv 文件等内容。我还用格式化字符串替换了self._sql = '{0} VALUES {1}'.format(self._sql_stub, self._all_placeholders) 行,因为我使用的是 Python 2.7。这应该有效,但没有。
  • 抛出了Pyodbc-error 42000,表示参数数量超过了最大参数数量(2100)。您提到此表值构造函数的硬限制是 1,000。在您的 CSV 文件中,您有 2 列。我猜这 1,000 来自 2,100 被 2 列潜水。所以我将 2,100 除以我的 8 列 = 262.50。我用 250 代替了你的 1,000,它就像一个魅力!谢谢
  • 主要目标是加快速度。这个目标实现了:我的方法用了 1,074 秒,你的方法用了 32 秒。速度大幅提升!
  • @MA53QXR 回复:“我猜 1,000 来自 2,100 的 2 列” - 不,T-SQL 表值构造函数限制为 1000 行,无论列数。参数数量的限制是不同的限制,很可能是由于 SQL 语句最终由系统存储过程执行(可能是sp_prepexec)。
猜你喜欢
  • 1970-01-01
  • 2013-01-13
  • 1970-01-01
  • 2016-11-29
  • 1970-01-01
  • 1970-01-01
  • 2014-09-16
  • 2011-10-31
  • 2013-06-06
相关资源
最近更新 更多