【问题标题】:How to execute multiple SQL commands at once in pd.read_sql_query?如何在 pd.read_sql_query 中一次执行多个 SQL 命令?
【发布时间】:2020-12-15 03:56:14
【问题描述】:

让我创建一个用例来讨论。

CREATE  DATABASE sample;
USE sample;
CREATE TABLE quote (
  `id` int(2) unsigned NOT NULL AUTO_INCREMENT,
  `code` text ,
  `date` date DEFAULT NULL,
  `close` double DEFAULT NULL,
  PRIMARY KEY (`id`)
) ;

INSERT INTO quote (`code`, `date`, `close`)
VALUES ('epm', '20200824', 2.64); 
INSERT INTO quote (`code`, `date`, `close`)
VALUES ('dss', '20200824', 6.4); 

使用 sqlalchemy 只执行一个 sql 命令很简单。

import pandas as pd 
from sqlalchemy import create_engine
user = 'root'
mysql_pass = 'your mysql passwd'
mysql_ip = '127.0.0.1'
engine = create_engine("mysql+pymysql://{}:{}@{}:3306".format(user,mysql_pass,mysql_ip))
cmd_one_line_sql = 'select * from sample.quote;'
df = pd.read_sql_query(cmd_one_line_sql,con = engine)
df 
   id code        date  close
0   1  epm  2020-08-24   2.64
1   2  dss  2020-08-24   6.40

我得到了想要的结果,现在cmd包含多个sql命令,为简单起见,它只包含两行

cmd_multi_lines_sql = 'use sample;select * from quote;'

cmd_multi_lines_sql 只是将cmd_one_line_sql 拆分为两个。
我按照手册重写了代码sn-p:
execute many sql commands with sqlalchemy

import pandas as pd
from sqlalchemy import create_engine
user = 'root'
mysql_pass = 'your mysql passwd'
mysql_ip = '127.0.0.1'
engine = create_engine("mysql+pymysql://{}:{}@{}:3306".format(user,mysql_pass,mysql_ip))
connection = engine.raw_connection()
cmd_multi_lines_sql = 'use sample;select * from quote;'
try:
    cursor = connection.cursor()
    cursor.execute(cmd_multi_lines_sql)
    results_one = cursor.fetchall()
finally:
    connection.close()

获取以下错误信息:

Traceback (most recent call last):
  File "<stdin>", line 3, in <module>
  File "/usr/local/lib/python3.5/dist-packages/pymysql/cursors.py", line 170, in execute
    result = self._query(query)
  File "/usr/local/lib/python3.5/dist-packages/pymysql/cursors.py", line 328, in _query
    conn.query(q)
  File "/usr/local/lib/python3.5/dist-packages/pymysql/connections.py", line 517, in query
    self._affected_rows = self._read_query_result(unbuffered=unbuffered)
  File "/usr/local/lib/python3.5/dist-packages/pymysql/connections.py", line 732, in _read_query_result
    result.read()
  File "/usr/local/lib/python3.5/dist-packages/pymysql/connections.py", line 1075, in read
    first_packet = self.connection._read_packet()
  File "/usr/local/lib/python3.5/dist-packages/pymysql/connections.py", line 684, in _read_packet
    packet.check_error()
  File "/usr/local/lib/python3.5/dist-packages/pymysql/protocol.py", line 220, in check_error
    err.raise_mysql_exception(self._data)
  File "/usr/local/lib/python3.5/dist-packages/pymysql/err.py", line 109, in raise_mysql_exception
    raise errorclass(errno, errval)
pymysql.err.ProgrammingError: (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near 'select * from quote' at line 1")

再试一次:

vim  /tmp/test.sql
use sample;
select * from quote;

#write the commands in `/tmp/test.sql`
f = open('/tmp/test.sql','r')
cmd = f.read() 
df = pd.read_sql_query(cmd, con = engine)

它输出相同的错误信息。如何修复它?

【问题讨论】:

  • 如果您没有编辑收到的错误消息,那么您的 cmd 变量(或文本文件)显然包含“此处的 cmd 命令”字符串。如果你编辑了它,不知道sql错误是什么,很难帮助你。

标签: mysql python-3.x sqlalchemy pymysql


【解决方案1】:

经过一些研究并在 github 上询问 答案很明显

你需要传递所需的参数

connect_args=

以及 sqlalchemy 以来的参数

{"client_flag": MULTI_STATEMENTS}

所以你的 python 代码和他的很像

from sqlalchemy import create_engine
import pymysql
from pymysql.constants.CLIENT import MULTI_STATEMENTS
user = 'root'
mysql_pass = 'testpassword'
mysql_ip = 'localhost'
cmd = 'SELECT * FROM table1;SELECT * FROM test'

engine = create_engine("mysql+pymysql://{}:{}@{}:3306/testdb1?charset=utf8".format(user,mysql_pass,mysql_ip),connect_args={"client_flag": MULTI_STATEMENTS})
connection = engine.raw_connection()

try:
    cursor = connection.cursor()
    cursor.execute(cmd)
    results_one = cursor.fetchall()
    cursor.nextset()
    results_two = cursor.fetchall()
    cursor.close()
finally:
    connection.close()

但使用此解决方案,您需要事先知道您运行哪些查询。

如果您想更灵活,使用动态 sql 语句

from sqlalchemy import create_engine
user = 'root'
mysql_pass = 'testpassword'
mysql_ip = 'localhost'
cmd = 'SELECT * FROM table1;SELECT * FROM test'
engine = create_engine("mysql+pymysql://{}:{}@{}:3306/testdb1?charset=utf8".format(user,mysql_pass,mysql_ip))
connection = engine.raw_connection()
splitstring = cmd.split(";")
ges_resultset = []
try:
    cursor = connection.cursor()
    for cmdoneonly in splitstring:
        cursor.execute(cmdoneonly)
        results = cursor.fetchall()
        ges_resultset.append(results) 
    cursor.close()
finally:
    connection.close()

您可以在其中检查每个命令并了解 python 如何对其做出反应

  • SELECT需要得到结果集
  • INSERT DELETE CREATE 你没有(还有更多,但你明白了要点)

【讨论】:

  • 请参阅my comment to another answer re:在分号字符上拆分匿名代码块。它并不总是有效,至少对于 T-SQL 无效。
  • 这是一个关于 mysql 和 pymysql 的问题,并且两者都按预期工作,对于 sql server 我根本找不到任何支持 sql server 多查询的解决方案。但请随时启发我
  • SQL Server 默认支持匿名代码块(至少在我遇到的任何情况下),因此无需显式启用“多语句”。
  • 启用特定于 py、ysql 并且因驱动程序而异。我怀疑我们是否可以制作一个通用的临时代码,但这会在这里不合时宜
  • 我怎样才能送你 250 声望赏金?
【解决方案2】:

你面临的问题是:

  1. 您需要将MULTI_STATEMENTS 标志传递给PyMySQL,并且
  2. read_sql_query 假定第一个结果集包含 DataFrame 的数据,而对于匿名代码块可能不是这样。

您可以创建自己的 PyMySQL 连接并像这样检索数据:

import pandas as pd
import pymysql
from pymysql.constants import CLIENT

conn_info = {
    "host": "localhost",
    "port": 3307,
    "user": "root",
    "password": "toot",
    "database": "mydb",
    "client_flag": CLIENT.MULTI_STATEMENTS,
}

cnxn = pymysql.connect(**conn_info)
crsr = cnxn.cursor()

sql = """\
CREATE TEMPORARY TABLE tmp (id int primary key, txt varchar(20)) 
    ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
INSERT INTO tmp (id, txt) VALUES (1, 'foo'), (2, 'ΟΠΑ!');
SELECT id, txt FROM tmp;
"""
crsr.execute(sql)

num_tries = 5
result = None
for i in range(num_tries):
    result = crsr.fetchall()
    if result:
        break
    crsr.nextset()

if not result:
    print(f"(no result found after {num_tries} attempts)")
else:
    df = pd.DataFrame(result, columns=[x[0] for x in crsr.description])
    print(df)
    """console output:
       id   txt
    0   1   foo
    1   2  ΟΠΑ!
    """

(编辑)附加说明:

注意 1:如 another answer 中所述,您可以使用 SQLAlchemy 的 create_engine 方法的 connect_args 参数来传递 MULTI_STATEMENTS 标志。如果你需要一个 SQLAlchemy Engine 对象来处理其他事情(例如,to_sql),那么这可能比直接创建你自己的 PyMySQL 连接更可取。

注2:num_tries可以任意大;这只是一种避免无限循环的方法。如果我们需要跳过第一个 n 个空结果集,那么无论如何我们都需要多次调用nextset,一旦我们找到非空结果集,我们就将break 排除在外循环。

【讨论】:

  • 太棒了! df = pd.DataFrame(result, columns=[x[0] for x in crsr.description]),太好了!
  • 希望 SQLAlchemy 上的开发人员升级以支持 MULTI_STATEMENTS 的 sql。
  • 我怎样才能送你 250 声望赏金?
  • @showkey - 我很欣赏这个想法,但不要担心。很高兴您的问题得到了满意的解决方案。
【解决方案3】:

@Gord Thompson,我对自动设置 num_tries 做了一点改进:

import pandas as pd
import pymysql
from pymysql.constants import CLIENT

conn_info = {
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "password": "your mysql passwd",
    "client_flag": CLIENT.MULTI_STATEMENTS,
}

cnxn = pymysql.connect(**conn_info)
crsr = cnxn.cursor()

sql = """\
create database sample;
USE sample;
CREATE TEMPORARY TABLE tmp (id int primary key, txt varchar(20)) 
    ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
INSERT INTO tmp (id, txt) VALUES (1, 'foo'), (2, 'ΟΠΑ!');
SELECT id, txt FROM tmp;
SELECT txt FROM tmp;
"""

crsr.execute(sql)
num_tries = sql.count(';') if sql.endswith(';') else sql.count(';') + 1


for i in range(num_tries):
    result = crsr.fetchall()
    if result:
        df = pd.DataFrame(result, columns=[x[0] for x in crsr.description])
        print(df)
    crsr.nextset()

@nbk:当cmd包含很多sql语句时,执行你的代码可能会遇到这样的问题:

pymysql.err.InternalError: (1065, 'Query was empty')

根据你的代码做一点改进:

import pandas as pd
from sqlalchemy import create_engine
user = 'root'
mysql_pass = 'your mysql passwd'
mysql_ip = 'localhost'

sql = """\
create database sample;
USE sample;
CREATE TEMPORARY TABLE tmp (id int primary key, txt varchar(20)) 
    ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
INSERT INTO tmp (id, txt) VALUES (1, 'foo'), (2, 'ΟΠΑ!');
SELECT id, txt FROM tmp;
SELECT txt FROM tmp;
"""

engine = create_engine("mysql+pymysql://{}:{}@{}:3306".format(user,mysql_pass,mysql_ip))

connection = engine.raw_connection()

splitstring = sql.split(";")
try:
    cursor = connection.cursor()
    for cmdoneonly in splitstring:
        if cmdoneonly.strip():
            cursor.execute(cmdoneonly)
            results = cursor.fetchall()
            if results :
                df = pd.DataFrame(results, columns=[x[0] for x in cursor.description])
                print(df)
    cursor.close()
finally:
    connection.close()
  • 需要添加一个determine语句if cmdoneonly.strip():以避免1065:Query was empty错误。

  • 这是df = pd.DataFrame(results, columns=[x[0] for x in cursor.description])@Gord Thompson学习的精彩声明。

【讨论】:

  • 请注意,某些 SQL 方言(例如,T-SQL)在要求语句以分号结束时可能相当松懈,因此简单地将文本拆分为 count 语句并不能保证始终有效。跨度>
猜你喜欢
  • 1970-01-01
  • 2011-01-21
  • 1970-01-01
  • 1970-01-01
  • 2017-05-01
  • 2021-03-13
  • 2012-08-31
  • 2016-11-10
  • 1970-01-01
相关资源
最近更新 更多