【发布时间】:2011-09-30 11:49:39
【问题描述】:
有没有优雅的方法在 SQLAlchemy 中执行INSERT ... ON DUPLICATE KEY UPDATE?我的意思是语法类似于 inserter.insert().execute(list_of_dictionaries) 的东西?
【问题讨论】:
标签: python mysql sqlalchemy
有没有优雅的方法在 SQLAlchemy 中执行INSERT ... ON DUPLICATE KEY UPDATE?我的意思是语法类似于 inserter.insert().execute(list_of_dictionaries) 的东西?
【问题讨论】:
标签: python mysql sqlalchemy
ON DUPLICATE KEY UPDATE 发布 MySQL 1.2 版此功能现在仅内置在 SQLAlchemy for MySQL 中。以下 somada141 的回答有最好的解决方案: https://stackoverflow.com/a/48373874/319066
ON DUPLICATE KEY UPDATE
如果您希望生成的 SQL 实际包含 ON DUPLICATE KEY UPDATE,最简单的方法是使用 @compiles 装饰器。
可以在on github 找到示例代码(链接自主题on reddit 的好线程):
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert
@compiles(Insert)
def append_string(insert, compiler, **kw):
s = compiler.visit_insert(insert, **kw)
if 'append_string' in insert.kwargs:
return s + " " + insert.kwargs['append_string']
return s
my_connection.execute(my_table.insert(append_string = 'ON DUPLICATE KEY UPDATE foo=foo'), my_values)
但请注意,在这种方法中,您必须手动创建 append_string。您可能可以更改 append_string 函数,以便它自动将插入字符串更改为带有“ON DUPLICATE KEY UPDATE”字符串的插入,但由于懒惰,我不会在这里这样做。
ON DUPLICATE KEY UPDATE ORM 中的功能SQLAlchemy 不提供到 ON DUPLICATE KEY UPDATE 或 MERGE 的接口,或者其 ORM 层中的任何其他类似功能。不过,它具有session.merge() 函数,只有在相关键是主键时才能复制该功能。
session.merge(ModelObject) 首先通过发送SELECT 查询(或在本地查找)检查是否存在具有相同主键值的行。如果是这样,它会在某处设置一个标志,表明 ModelObject 已经在数据库中,并且 SQLAlchemy 应该使用UPDATE 查询。请注意,merge 比这复杂得多,但它使用主键很好地复制了功能。
但是,如果您希望 ON DUPLICATE KEY UPDATE 具有非主键(例如,另一个唯一键)的功能怎么办?不幸的是,SQLAlchemy 没有任何这样的功能。相反,您必须创建类似于 Django 的get_or_create() 的东西。 Another StackOverflow answer covers it,为方便起见,我将在此处粘贴修改后的工作版本。
def get_or_create(session, model, defaults=None, **kwargs):
instance = session.query(model).filter_by(**kwargs).first()
if instance:
return instance
else:
params = dict((k, v) for k, v in kwargs.iteritems() if not isinstance(v, ClauseElement))
if defaults:
params.update(defaults)
instance = model(**params)
return instance
【讨论】:
append_string 代码在 postgres 上不起作用(在 9.5 中它是新的 ON CONFLICT [IGNORE|UPDATE] 功能,因为 ORM 会自动将 RETURNING {primary key} 附加到插入,这会导致无效的 SQL .
foo=foo 部分在这里做什么,我将在我自己的表中用什么替换 foo?
append_string 无效获取SAWarning: Can't validate argument 'append_string'; can't locate any SQLAlchemy dialect named 'append' % (k, dialect_name)
有一个更简单的解决方案:
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert
@compiles(Insert)
def replace_string(insert, compiler, **kw):
s = compiler.visit_insert(insert, **kw)
s = s.replace("INSERT INTO", "REPLACE INTO")
return s
my_connection.execute(my_table.insert(replace_string=""), my_values)
【讨论】:
REPLACE INTO 和 INSERT ... ON DUPLICATE KEY UPDATE 做不同的事情。
InnoDB(或任何其他事务引擎)表上非常无用,因为它在大多数@987654325 上都会阻塞@约束
因为这些解决方案都不是那么优雅。一种蛮力的方法是查询该行是否存在。如果它确实删除了该行然后插入,否则只需插入。显然涉及一些开销,但它不依赖于修改原始 sql,它适用于非 orm 的东西。
【讨论】:
我只是用普通的sql作为:
insert_stmt = "REPLACE INTO tablename (column1, column2) VALUES (:column_1_bind, :columnn_2_bind) "
session.execute(insert_stmt, data)
【讨论】:
这取决于你。如果要替换,请在前缀中传递OR REPLACE
def bulk_insert(self,objects,table):
#table: Your table class and objects are list of dictionary [{col1:val1, col2:vale}]
for counter,row in enumerate(objects):
inserter = table.__table__.insert(prefixes=['OR IGNORE'], values=row)
try:
self.db.execute(inserter)
except Exception as E:
print E
if counter % 100 == 0:
self.db.commit()
self.db.commit()
这里可以更改commit间隔以加速或减速
【讨论】:
基于phsource's answer,对于使用MySQL并完全覆盖相同键的数据而不执行DELETE语句的特定用例,可以使用以下@987654324 @装饰插入表达式:
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert
@compiles(Insert)
def append_string(insert, compiler, **kw):
s = compiler.visit_insert(insert, **kw)
if insert.kwargs.get('on_duplicate_key_update'):
fields = s[s.find("(") + 1:s.find(")")].replace(" ", "").split(",")
generated_directive = ["{0}=VALUES({0})".format(field) for field in fields]
return s + " ON DUPLICATE KEY UPDATE " + ",".join(generated_directive)
return s
【讨论】:
INSERT 中的值覆盖字段(指的是字段名称而不是值),因此不需要转义。显然,使用 now-part-of-the-ORM 功能会更好(除非将它与 INSERT FROM SELECT 一起使用,它不能按预期工作)
我应该提一下,自 v1.2 版本以来,SQLAlchemy 的“核心”已经内置了上述解决方案,可以在here 下看到(复制的 sn-p 如下):
from sqlalchemy.dialects.mysql import insert
insert_stmt = insert(my_table).values(
id='some_existing_id',
data='inserted value')
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
data=insert_stmt.inserted.data,
status='U'
)
conn.execute(on_duplicate_key_stmt)
【讨论】:
values 也接受lists of dict 对象。
data 是如何填充的?是不是像data={'field_1'='value1'}。谢谢
我的方式
import typing
from datetime import datetime
from sqlalchemy.dialects import mysql
class MyRepository:
def model(self):
return MySqlAlchemyModel
def upsert(self, data: typing.List[typing.Dict]):
if not data:
return
model = self.model()
if hasattr(model, 'created_at'):
for item in data:
item['created_at'] = datetime.now()
stmt = mysql.insert(getattr(model, '__table__')).values(data)
for_update = []
for k, v in data[0].items():
for_update.append(k)
dup = {k: getattr(stmt.inserted, k) for k in for_update}
stmt = stmt.on_duplicate_key_update(**dup)
self.db.session.execute(stmt)
self.db.session.commit()
用法:
myrepo.upsert([
{
"field11": "value11",
"field21": "value21",
"field31": "value31",
},
{
"field12": "value12",
"field22": "value22",
"field32": "value32",
},
])
【讨论】:
其他答案已经涵盖了这一点,但我想我会参考我在this gist 中找到的另一个很好的 mysql 示例。这还包括LAST_INSERT_ID 的使用,这取决于您的 innodb 自动增量设置以及您的表是否具有唯一键,这可能很有用。此处摘取代码方便参考,如果觉得有用请给作者一个star。
from app import db
from sqlalchemy import func
from sqlalchemy.dialects.mysql import insert
def upsert(model, insert_dict):
"""model can be a db.Model or a table(), insert_dict should contain a primary or unique key."""
inserted = insert(model).values(**insert_dict)
upserted = inserted.on_duplicate_key_update(
id=func.LAST_INSERT_ID(model.id), **{k: inserted.inserted[k]
for k, v in insert_dict.items()})
res = db.engine.execute(upserted)
return res.lastrowid
【讨论】: