【问题标题】:How to do an upsert with SqlAlchemy?如何使用 SqlAlchemy 进行 upsert?
【发布时间】:2011-11-02 05:16:38
【问题描述】:

我有一条记录,如果它不存在,我希望它存在于数据库中,如果它已经存在(主键存在),我希望将字段更新到当前状态。这通常称为upsert

以下不完整的代码 sn-p 演示了什么会起作用,但它似乎过于笨拙(特别是如果有更多的列)。什么是更好/最好的方法?

Base = declarative_base()
class Template(Base):
    __tablename__ = 'templates'
    id = Column(Integer, primary_key = True)
    name = Column(String(80), unique = True, index = True)
    template = Column(String(80), unique = True)
    description = Column(String(200))
    def __init__(self, Name, Template, Desc):
        self.name = Name
        self.template = Template
        self.description = Desc

def UpsertDefaultTemplate():
    sess = Session()
    desired_default = Template("default", "AABBCC", "This is the default template")
    try:
        q = sess.query(Template).filter_by(name = desiredDefault.name)
        existing_default = q.one()
    except sqlalchemy.orm.exc.NoResultFound:
        #default does not exist yet, so add it...
        sess.add(desired_default)
    else:
        #default already exists.  Make sure the values are what we want...
        assert isinstance(existing_default, Template)
        existing_default.name = desired_default.name
        existing_default.template = desired_default.template
        existing_default.description = desired_default.description
    sess.flush()

有没有更好或更简洁的方法来做到这一点?像这样的东西会很棒:

sess.upsert_this(desired_default, unique_key = "name")

虽然unique_key kwarg 显然是不必要的(ORM 应该能够轻松解决这个问题),但我添加它只是因为 SQLAlchemy 倾向于只使用主键。例如:我一直在研究Session.merge 是否适用,但这仅适用于主键,在这种情况下,主键是一个自动递增的 id,对此目的并不是非常有用。

一个示例用例是在启动可能已升级其默认预期数据的服务器应用程序时。即:此 upsert 没有并发问题。

【问题讨论】:

  • 如果 name 字段是唯一的,为什么不能将其设为主键(在这种情况下合并会起作用)。为什么需要单独的主键?
  • @abbot:我不想进入 id 字段辩论,但是......简短的回答是“外键”。更长的是,虽然名称确实是唯一需要的唯一键,但存在两个问题。 1) 当模板记录被另一个表中的 5000 万条记录引用时,将该 FK 作为字符串字段是疯狂的。索引整数更好,因此看似毫无意义的 id 列。并且 2) 在此基础上进行扩展,如果字符串 用作 FK,那么现在有两个位置可以在名称更改时更新名称,这很烦人并且充斥着死关系问题。 id 从不改变。
  • 你可以尝试一个新的(测试版)upsert library for python...它与 psycopg2、sqlite3、MySQLdb 兼容

标签: python sqlalchemy upsert


【解决方案1】:

SQLAlchemy 确实具有“保存或更新”行为,在最近的版本中已内置到 session.add 中,但以前是单独的 session.saveorupdate 调用。这不是“upsert”,但它可能足以满足您的需求。

很高兴您询问具有多个唯一键的类;我相信这正是没有单一正确方法可以做到这一点的原因。主键也是唯一键。如果没有唯一约束,只有主键,这将是一个足够简单的问题:如果给定 ID 不存在,或者如果 ID 为 None,则创建一条新记录;否则使用该主键更新现有记录中的所有其他字段。

但是,当有额外的独特约束时,这种简单的方法就会出现逻辑问题。如果您想“更新”一个对象,并且您的对象的主键匹配现有记录,但另一个唯一列匹配 不同 记录,那么你会怎么做?同样,如果主键不匹配现有记录,但另一个唯一列确实匹配现有记录,那会怎样?对于您的特定情况,可能会有一个正确答案,但总的来说,我认为没有一个正确答案。

这就是没有内置“upsert”操作的原因。应用程序必须在每种特定情况下定义这意味着什么。

【讨论】:

    【解决方案2】:

    SQLAlchemy 支持ON CONFLICT 有两种方法on_conflict_do_update()on_conflict_do_nothing()

    the documentation复制:

    from sqlalchemy.dialects.postgresql import insert
    
    stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
    stmt = stmt.on_conflict_do_update(
        index_elements=[my_table.c.user_email],
        index_where=my_table.c.user_email.like('%@gmail.com'),
        set_=dict(data=stmt.excluded.data)
    )
    conn.execute(stmt)
    

    【讨论】:

    • on_duplicate_key_update也支持MySQL
    • 只是execute 无法获得返回ID
    • 这段代码是的,我认为(答案是 3 岁以上),但 Michaels 的评论可能适用于 MySQL。一般来说,我的(这个)答案有点草率地得出结论,postgres 被用作数据库。这不是很好,因为它并没有真正回答所提出的一般性问题。但根据我得到的支持,我认为它对某些人有用,所以我放弃了。
    【解决方案3】:

    如今,SQLAlchemy 提供了两个有用的函数 on_conflict_do_nothingon_conflict_do_update。这些功能很有用,但需要您从 ORM 界面切换到较低级别的界面 - SQLAlchemy Core

    虽然这两个函数使得使用 SQLAlchemy 的语法进行更新插入并不那么困难,但这些函数远没有为更新插入提供完整的开箱即用解决方案。

    我的常见用例是在单个 SQL 查询/会话执行中插入大量行。我通常会遇到两个更新插入的问题:

    例如,我们已经习惯的更高级别的 ORM 功能丢失了。您不能使用 ORM 对象,而是必须在插入时提供 ForeignKeys。

    我正在使用this 以下函数来处理这两个问题:

    def upsert(session, model, rows):
        table = model.__table__
        stmt = postgresql.insert(table)
        primary_keys = [key.name for key in inspect(table).primary_key]
        update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
    
        if not update_dict:
            raise ValueError("insert_or_update resulted in an empty update_dict")
    
        stmt = stmt.on_conflict_do_update(index_elements=primary_keys,
                                          set_=update_dict)
    
        seen = set()
        foreign_keys = {col.name: list(col.foreign_keys)[0].column for col in table.columns if col.foreign_keys}
        unique_constraints = [c for c in table.constraints if isinstance(c, UniqueConstraint)]
        def handle_foreignkeys_constraints(row):
            for c_name, c_value in foreign_keys.items():
                foreign_obj = row.pop(c_value.table.name, None)
                row[c_name] = getattr(foreign_obj, c_value.name) if foreign_obj else None
    
            for const in unique_constraints:
                unique = tuple([const,] + [row[col.name] for col in const.columns])
                if unique in seen:
                    return None
                seen.add(unique)
    
            return row
    
        rows = list(filter(None, (handle_foreignkeys_constraints(row) for row in rows)))
        session.execute(stmt, rows)
    

    【讨论】:

    • on_conflict 仅适用于支持原生 ON CONFLICT 子句的后端。因此,只有 postgresql
    • @cowbert 现在 SQLAlchemy 还支持 MySQL 的 ON DUPLICATE KEY UPDATE
    【解决方案4】:

    我使用“先看后跳”的方法:

    # first get the object from the database if it exists
    # we're guaranteed to only get one or zero results
    # because we're filtering by primary key
    switch_command = session.query(Switch_Command).\
        filter(Switch_Command.switch_id == switch.id).\
        filter(Switch_Command.command_id == command.id).first()
    
    # If we didn't get anything, make one
    if not switch_command:
        switch_command = Switch_Command(switch_id=switch.id, command_id=command.id)
    
    # update the stuff we care about
    switch_command.output = 'Hooray!'
    switch_command.lastseen = datetime.datetime.utcnow()
    
    session.add(switch_command)
    # This will generate either an INSERT or UPDATE
    # depending on whether we have a new object or not
    session.commit()
    

    优点是它是 db-neutral 的,我认为它读起来很清楚。缺点是在如下场景中存在潜在的竞态条件

    • 我们在数据库中查询switch_command,但没有找到
    • 我们创建一个switch_command
    • 另一个进程或线程创建了一个与我们的主键相同的switch_command
    • 我们尝试提交我们的switch_command

    【讨论】:

    • This question 使用 try/catch 处理竞争条件
    • upsert 的整个目标是避免这里描述的竞争条件。
    • @sampierson 我知道 - 这就是为什么 SQLALchemy 很难做到干净和便携......我已经在我的回答中强调了竞争条件
    【解决方案5】:

    以下内容适用于我的 redshift 数据库,也适用于组合主键约束。

    来源this

    在函数中创建 SQLAlchemy 引擎只需进行少量修改 def start_engine()

    from sqlalchemy import Column, Integer, Date ,Metadata
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.dialects.postgresql import insert
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy.dialects import postgresql
    
    Base = declarative_base()
    
    def start_engine():
        engine = create_engine(os.getenv('SQLALCHEMY_URI', 
        'postgresql://localhost:5432/upsert'))
         connect = engine.connect()
        meta = MetaData(bind=engine)
        meta.reflect(bind=engine)
        return engine
    
    
    class DigitalSpend(Base):
        __tablename__ = 'digital_spend'
        report_date = Column(Date, nullable=False)
        day = Column(Date, nullable=False, primary_key=True)
        impressions = Column(Integer)
        conversions = Column(Integer)
    
        def __repr__(self):
            return str([getattr(self, c.name, None) for c in self.__table__.c])
    
    
    def compile_query(query):
        compiler = query.compile if not hasattr(query, 'statement') else 
      query.statement.compile
        return compiler(dialect=postgresql.dialect())
    
    
    def upsert(session, model, rows, as_of_date_col='report_date', no_update_cols=[]):
        table = model.__table__
    
        stmt = insert(table).values(rows)
    
        update_cols = [c.name for c in table.c
                       if c not in list(table.primary_key.columns)
                       and c.name not in no_update_cols]
    
        on_conflict_stmt = stmt.on_conflict_do_update(
            index_elements=table.primary_key.columns,
            set_={k: getattr(stmt.excluded, k) for k in update_cols},
            index_where=(getattr(model, as_of_date_col) < getattr(stmt.excluded, as_of_date_col))
            )
    
        print(compile_query(on_conflict_stmt))
        session.execute(on_conflict_stmt)
    
    
    session = start_engine()
    upsert(session, DigitalSpend, initial_rows, no_update_cols=['conversions'])
    

    【讨论】:

      【解决方案6】:

      这允许基于字符串名称访问底层模型

      def get_class_by_tablename(tablename):
        """Return class reference mapped to table.
        https://stackoverflow.com/questions/11668355/sqlalchemy-get-model-from-table-name-this-may-imply-appending-some-function-to
        :param tablename: String with name of table.
        :return: Class reference or None.
        """
        for c in Base._decl_class_registry.values():
          if hasattr(c, '__tablename__') and c.__tablename__ == tablename:
            return c
      
      
      sqla_tbl = get_class_by_tablename(table_name)
      
      def handle_upsert(record_dict, table):
          """
          handles updates when there are primary key conflicts
      
          """
          try:
              self.active_session().add(table(**record_dict))
          except:
              # Here we'll assume the error is caused by an integrity error
              # We do this because the error classes are passed from the
              # underlying package (pyodbc / sqllite) SQLAlchemy doesn't mask
              # them with it's own code - this should be updated to have
              # explicit error handling for each new db engine
      
              # <update>add explicit error handling for each db engine</update> 
              active_session.rollback()
              # Query for conflic class, use update method to change values based on dict
              c_tbl_primary_keys = [i.name for i in table.__table__.primary_key] # List of primary key col names
              c_tbl_cols = dict(sqla_tbl.__table__.columns) # String:Col Object crosswalk
      
              c_query_dict = {k:record_dict[k] for k in c_tbl_primary_keys if k in record_dict} # sub-dict from data of primary key:values
              c_oo_query_dict = {c_tbl_cols[k]:v for (k,v) in c_query_dict.items()} # col-object:query value for primary key cols
      
              c_target_record = session.query(sqla_tbl).filter(*[k==v for (k,v) in oo_query_dict.items()]).first()
      
              # apply new data values to the existing record
              for k, v in record_dict.items()
                  setattr(c_target_record, k, v)
      

      【讨论】:

        【解决方案7】:

        这对我来说适用于 sqlite3 和 postgres。尽管它可能会因组合主键约束而失败,并且很可能会因额外的唯一约束而失败。

            try:
                t = self._meta.tables[data['table']]
            except KeyError:
                self._log.error('table "%s" unknown', data['table'])
                return
        
            try:
                q = insert(t, values=data['values'])
                self._log.debug(q)
                self._db.execute(q)
            except IntegrityError:
                self._log.warning('integrity error')
                where_clause = [c.__eq__(data['values'][c.name]) for c in t.c if c.primary_key]
                update_dict = {c.name: data['values'][c.name] for c in t.c if not c.primary_key}
                q = update(t, values=update_dict).where(*where_clause)
                self._log.debug(q)
                self._db.execute(q)
            except Exception as e:
                self._log.error('%s: %s', t.name, e)
        

        【讨论】:

          【解决方案8】:

          有多个答案,这里还有另一个答案 (YAA)。由于涉及元编程,其他答案不那么可读。这是一个例子

          • 使用 SQLAlchemy ORM

          • 展示了如何使用on_conflict_do_nothing在零行的情况下创建行

          • 展示如何在不使用on_conflict_do_update 创建新行的情况下更新现有行(如果有)

          • 使用表主键作为constraint

          the original question what this code is related to 中的更长示例。

          
          import sqlalchemy as sa
          import sqlalchemy.orm as orm
          from sqlalchemy import text
          from sqlalchemy.dialects.postgresql import insert
          from sqlalchemy.orm import Session
          
          class PairState(Base):
          
              __tablename__ = "pair_state"
          
              # This table has 1-to-1 relationship with Pair
              pair_id = sa.Column(sa.ForeignKey("pair.id"), nullable=False, primary_key=True, unique=True)
              pair = orm.relationship(Pair,
                                  backref=orm.backref("pair_state",
                                                  lazy="dynamic",
                                                  cascade="all, delete-orphan",
                                                  single_parent=True, ), )
          
          
              # First raw event in data stream
              first_event_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))
          
              # Last raw event in data stream
              last_event_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))
          
              # The last hypertable entry added
              last_interval_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))
          
              @staticmethod
              def create_first_event_if_not_exist(dbsession: Session, pair_id: int, ts: datetime.datetime):
                  """Sets the first event value if not exist yet."""
                  dbsession.execute(
                      insert(PairState).
                      values(pair_id=pair_id, first_event_at=ts).
                      on_conflict_do_nothing()
                  )
          
              @staticmethod
              def update_last_event(dbsession: Session, pair_id: int, ts: datetime.datetime):
                  """Replaces the the column last_event_at for a named pair."""
                  # Based on the original example of https://stackoverflow.com/a/49917004/315168
                  dbsession.execute(
                      insert(PairState).
                      values(pair_id=pair_id, last_event_at=ts).
                      on_conflict_do_update(constraint=PairState.__table__.primary_key, set_={"last_event_at": ts})
                  )
          
              @staticmethod
              def update_last_interval(dbsession: Session, pair_id: int, ts: datetime.datetime):
                  """Replaces the the column last_interval_at for a named pair."""
                  dbsession.execute(
                      insert(PairState).
                      values(pair_id=pair_id, last_interval_at=ts).
                      on_conflict_do_update(constraint=PairState.__table__.primary_key, set_={"last_interval_at": ts})
                  )
          

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 2017-06-03
            • 1970-01-01
            • 1970-01-01
            • 2013-06-23
            • 2015-03-13
            • 2012-07-17
            • 2021-11-27
            相关资源
            最近更新 更多