【问题标题】:How to upsert pandas DataFrame to PostgreSQL table?如何将 pandas DataFrame 插入到 PostgreSQL 表中?
【发布时间】:2020-08-05 13:33:14
【问题描述】:

我从网络资源中抓取了一些数据并将其全部存储在 pandas DataFrame 中。现在,为了利用 SQLAlchemy 提供的强大 db 工具,我想将所述 DataFrame 转换为 Table() 对象,并最终将所有数据插入到 PostgreSQL 表中。如果这是可行的,那么完成这项任务的可行方法是什么?

【问题讨论】:

    标签: python pandas postgresql sqlalchemy upsert


    【解决方案1】:

    如果您使用的是 PostgreSQL 9.5 或更高版本,您可以使用临时表和 INSERT ... ON CONFLICT 语句执行 UPSERT:

    import sqlalchemy as sa
    
    # …
    
    with engine.begin() as conn:
        # step 0.0 - create test environment
        conn.execute(sa.text("DROP TABLE IF EXISTS main_table"))
        conn.execute(
            sa.text(
                "CREATE TABLE main_table (id int primary key, txt varchar(50))"
            )
        )
        conn.execute(
            sa.text(
                "INSERT INTO main_table (id, txt) VALUES (1, 'row 1 old text')"
            )
        )
        # step 0.1 - create DataFrame to UPSERT
        df = pd.DataFrame(
            [(2, "new row 2 text"), (1, "row 1 new text")], columns=["id", "txt"]
        )
        
        # step 1 - create temporary table and upload DataFrame
        conn.execute(
            sa.text(
                "CREATE TEMPORARY TABLE temp_table (id int primary key, txt varchar(50))"
            )
        )
        df.to_sql("temp_table", conn, index=False, if_exists="append")
    
        # step 2 - merge temp_table into main_table
        conn.execute(
            sa.text("""\
                INSERT INTO main_table (id, txt) 
                SELECT id, txt FROM temp_table
                ON CONFLICT (id) DO
                    UPDATE SET txt = EXCLUDED.txt
                """
            )
        )
    
        # step 3 - confirm results
        result = conn.execute(sa.text("SELECT * FROM main_table ORDER BY id")).fetchall()
        print(result)  # [(1, 'row 1 new text'), (2, 'new row 2 text')]
    

    【讨论】:

      【解决方案2】:

      我已经需要这个很多次了,我最终创建了一个gist for it

      函数如下,如果是第一次持久化dataframe会创建表,如果已经存在则更新表:

      import pandas as pd
      import sqlalchemy
      import uuid
      import os
      
      def upsert_df(df: pd.DataFrame, table_name: str, engine: sqlalchemy.engine.Engine):
          """Implements the equivalent of pd.DataFrame.to_sql(..., if_exists='update')
          (which does not exist). Creates or updates the db records based on the
          dataframe records.
          Conflicts to determine update are based on the dataframes index.
          This will set unique keys constraint on the table equal to the index names
          1. Create a temp table from the dataframe
          2. Insert/update from temp table into table_name
          Returns: True if successful
          """
      
          # If the table does not exist, we should just use to_sql to create it
          if not engine.execute(
              f"""SELECT EXISTS (
                  SELECT FROM information_schema.tables 
                  WHERE  table_schema = 'public'
                  AND    table_name   = '{table_name}');
                  """
          ).first()[0]:
              df.to_sql(table_name, engine)
              return True
      
          # If it already exists...
          temp_table_name = f"temp_{uuid.uuid4().hex[:6]}"
          df.to_sql(temp_table_name, engine, index=True)
      
          index = list(df.index.names)
          index_sql_txt = ", ".join([f'"{i}"' for i in index])
          columns = list(df.columns)
          headers = index + columns
          headers_sql_txt = ", ".join(
              [f'"{i}"' for i in headers]
          )  # index1, index2, ..., column 1, col2, ...
      
          # col1 = exluded.col1, col2=excluded.col2
          update_column_stmt = ", ".join([f'"{col}" = EXCLUDED."{col}"' for col in columns])
      
          # For the ON CONFLICT clause, postgres requires that the columns have unique constraint
          query_pk = f"""
          ALTER TABLE "{table_name}" DROP CONSTRAINT IF EXISTS unique_constraint_for_upsert;
          ALTER TABLE "{table_name}" ADD CONSTRAINT unique_constraint_for_upsert UNIQUE ({index_sql_txt});
          """
          engine.execute(query_pk)
      
          # Compose and execute upsert query
          query_upsert = f"""
          INSERT INTO "{table_name}" ({headers_sql_txt}) 
          SELECT {headers_sql_txt} FROM "{temp_table_name}"
          ON CONFLICT ({index_sql_txt}) DO UPDATE 
          SET {update_column_stmt};
          """
          engine.execute(query_upsert)
          engine.execute(f"DROP TABLE {temp_table_name}")
      
          return True
      

      【讨论】:

      • 魔术,这很好用!很容易成为SO的最佳答案。正如评论中提到的,它完全等同于pd.DataFrame.to_sql(..., if_exists='update'),它甚至添加了一个索引级别的重复约束,因此重复项不可能出现在表中。
      【解决方案3】:

      这是我在 pandas 数据帧中对 postgresql 的冲突更新查询进行批量插入和插入的代码:

      假设 id 是 postgresql 表和 pandas df 的唯一键,您想根据此 id 插入和更新。

      import pandas as pd
      from sqlalchemy import create_engine, text
      
      engine = create_engine(postgresql://username:pass@host:port/dbname)
      query = text(f""" 
                      INSERT INTO schema.table(name, title, id)
                      VALUES {','.join([str(i) for i in list(df.to_records(index=False))])}
                      ON CONFLICT (id)
                      DO  UPDATE SET name= excluded.name,
                                     title= excluded.title
               """)
      engine.execute(query)
      

      确保您的 df 列的顺序必须与您的表格相同。

      编辑 1:

      感谢 Gord Thompson 的评论,我意识到如果列中有单引号,此查询将不起作用。因此,如果列中有单引号,这是一个修复:

      import pandas as pd
      from sqlalchemy import create_engine, text
      
      df.name = df.name.str.replace("'", "''")
      df.title = df.title.str.replace("'", "''")
      engine = create_engine(postgresql://username:pass@host:port/dbname)
      query = text(""" 
                  INSERT INTO author(name, title, id)
                  VALUES %s
                  ON CONFLICT (id)
                  DO  UPDATE SET name= excluded.name,
                                 title= excluded.title
           """ % ','.join([str(i) for i in list(df.to_records(index=False))]).replace('"', "'"))
      engine.execute(query)
      

      【讨论】:

      • SQL 注入问题: 如果nametitle 包含单引号,上述代码将失败。示例here.
      • @GordThompson 感谢您的评论。我已经在上面编辑了我的解决方案
      • 现在如果nametitle 包含double quotes,代码将失败。 :(
      • 是否有使用 sql 参数而不是 % 字符串插值的版本? df 中的 NULL 值会破坏字符串插值,就像这里的情况一样。
      【解决方案4】:

      如果您的 DataFrame 和 SQL 表已经包含相同的列名和类型,请考虑使用此函数。 优点:

      • 如果您要插入一个长数据框,那就太好了。 (批处理)
      • 避免在代码中编写长 sql 语句。
      • 快速

      .

      from sqlalchemy import Table
      from sqlalchemy.engine.base import Engine as sql_engine
      from sqlalchemy.dialects.postgresql import insert
      from sqlalchemy.ext.automap import automap_base
      import pandas as pd
      
      
      def upsert_database(list_input: pd.DataFrame, engine: sql_engine, table: str, schema: str) -> None:
          if len(list_input) == 0:
              return None
          flattened_input = list_input.to_dict('records')
          with engine.connect() as conn:
              base = automap_base()
              base.prepare(engine, reflect=True, schema=schema)
              target_table = Table(table, base.metadata,
                                   autoload=True, autoload_with=engine, schema=schema)
              chunks = [flattened_input[i:i + 1000] for i in range(0, len(flattened_input), 1000)]
              for chunk in chunks:
                  stmt = insert(target_table).values(chunk)
                  update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
                  conn.execute(stmt.on_conflict_do_update(
                      constraint=f'{table}_pkey',
                      set_=update_dict)
                  )
      

      【讨论】:

      • 我想使用它,但我对 sqlalchemy 的所有新功能感到有点害怕。如果您有机会解释或评论此答案,我认为这对于我们这些需要从数据框进行更新插入的人来说可能是一个很好的选择。
      【解决方案5】:

      如果你已经有一个 pandas 数据框,你可以使用 df.to_sql 直接通过 SQLAlchemy 推送数据

      from sqlalchemy import create_engine
      #create a connection from Postgre URI
      cnxn = create_engine("postgresql+psycopg2://username:password@host:port/database")
      #write dataframe to database
      df.to_sql("my_table", con=cnxn, schema="myschema")
      

      【讨论】:

      • 确实,这当然是一个可行的选择,感谢您的意见!但是,我希望更新数据 - 而不仅仅是插入或替换表。这就是我认为 sqlalchemy 可能是更好的选择的地方。
      • stackoverflow.com/questions/25955200/… 也许您可以将此包装器用于 SqlAlchemy Insert,它使用 on commit 子句动态实现 upsert?
      • 是的,这仅适用于具有 Table() sqlalchemy 对象的情况。为此,我首先需要将 pandas df 转换为 Table() 对象。 - 这是我想做的主要和第一件事
      猜你喜欢
      • 2017-06-04
      • 2018-08-13
      • 2020-10-04
      • 2021-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-11-24
      • 2021-06-27
      • 1970-01-01
      相关资源
      最近更新 更多