【发布时间】:2020-08-05 13:33:14
【问题描述】:
我从网络资源中抓取了一些数据并将其全部存储在 pandas DataFrame 中。现在,为了利用 SQLAlchemy 提供的强大 db 工具,我想将所述 DataFrame 转换为 Table() 对象,并最终将所有数据插入到 PostgreSQL 表中。如果这是可行的,那么完成这项任务的可行方法是什么?
【问题讨论】:
标签: python pandas postgresql sqlalchemy upsert
我从网络资源中抓取了一些数据并将其全部存储在 pandas DataFrame 中。现在,为了利用 SQLAlchemy 提供的强大 db 工具,我想将所述 DataFrame 转换为 Table() 对象,并最终将所有数据插入到 PostgreSQL 表中。如果这是可行的,那么完成这项任务的可行方法是什么?
【问题讨论】:
标签: python pandas postgresql sqlalchemy upsert
如果您使用的是 PostgreSQL 9.5 或更高版本,您可以使用临时表和 INSERT ... ON CONFLICT 语句执行 UPSERT:
import sqlalchemy as sa
# …
with engine.begin() as conn:
# step 0.0 - create test environment
conn.execute(sa.text("DROP TABLE IF EXISTS main_table"))
conn.execute(
sa.text(
"CREATE TABLE main_table (id int primary key, txt varchar(50))"
)
)
conn.execute(
sa.text(
"INSERT INTO main_table (id, txt) VALUES (1, 'row 1 old text')"
)
)
# step 0.1 - create DataFrame to UPSERT
df = pd.DataFrame(
[(2, "new row 2 text"), (1, "row 1 new text")], columns=["id", "txt"]
)
# step 1 - create temporary table and upload DataFrame
conn.execute(
sa.text(
"CREATE TEMPORARY TABLE temp_table (id int primary key, txt varchar(50))"
)
)
df.to_sql("temp_table", conn, index=False, if_exists="append")
# step 2 - merge temp_table into main_table
conn.execute(
sa.text("""\
INSERT INTO main_table (id, txt)
SELECT id, txt FROM temp_table
ON CONFLICT (id) DO
UPDATE SET txt = EXCLUDED.txt
"""
)
)
# step 3 - confirm results
result = conn.execute(sa.text("SELECT * FROM main_table ORDER BY id")).fetchall()
print(result) # [(1, 'row 1 new text'), (2, 'new row 2 text')]
【讨论】:
我已经需要这个很多次了,我最终创建了一个gist for it。
函数如下,如果是第一次持久化dataframe会创建表,如果已经存在则更新表:
import pandas as pd
import sqlalchemy
import uuid
import os
def upsert_df(df: pd.DataFrame, table_name: str, engine: sqlalchemy.engine.Engine):
"""Implements the equivalent of pd.DataFrame.to_sql(..., if_exists='update')
(which does not exist). Creates or updates the db records based on the
dataframe records.
Conflicts to determine update are based on the dataframes index.
This will set unique keys constraint on the table equal to the index names
1. Create a temp table from the dataframe
2. Insert/update from temp table into table_name
Returns: True if successful
"""
# If the table does not exist, we should just use to_sql to create it
if not engine.execute(
f"""SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = '{table_name}');
"""
).first()[0]:
df.to_sql(table_name, engine)
return True
# If it already exists...
temp_table_name = f"temp_{uuid.uuid4().hex[:6]}"
df.to_sql(temp_table_name, engine, index=True)
index = list(df.index.names)
index_sql_txt = ", ".join([f'"{i}"' for i in index])
columns = list(df.columns)
headers = index + columns
headers_sql_txt = ", ".join(
[f'"{i}"' for i in headers]
) # index1, index2, ..., column 1, col2, ...
# col1 = exluded.col1, col2=excluded.col2
update_column_stmt = ", ".join([f'"{col}" = EXCLUDED."{col}"' for col in columns])
# For the ON CONFLICT clause, postgres requires that the columns have unique constraint
query_pk = f"""
ALTER TABLE "{table_name}" DROP CONSTRAINT IF EXISTS unique_constraint_for_upsert;
ALTER TABLE "{table_name}" ADD CONSTRAINT unique_constraint_for_upsert UNIQUE ({index_sql_txt});
"""
engine.execute(query_pk)
# Compose and execute upsert query
query_upsert = f"""
INSERT INTO "{table_name}" ({headers_sql_txt})
SELECT {headers_sql_txt} FROM "{temp_table_name}"
ON CONFLICT ({index_sql_txt}) DO UPDATE
SET {update_column_stmt};
"""
engine.execute(query_upsert)
engine.execute(f"DROP TABLE {temp_table_name}")
return True
【讨论】:
pd.DataFrame.to_sql(..., if_exists='update'),它甚至添加了一个索引级别的重复约束,因此重复项不可能出现在表中。
这是我在 pandas 数据帧中对 postgresql 的冲突更新查询进行批量插入和插入的代码:
假设 id 是 postgresql 表和 pandas df 的唯一键,您想根据此 id 插入和更新。
import pandas as pd
from sqlalchemy import create_engine, text
engine = create_engine(postgresql://username:pass@host:port/dbname)
query = text(f"""
INSERT INTO schema.table(name, title, id)
VALUES {','.join([str(i) for i in list(df.to_records(index=False))])}
ON CONFLICT (id)
DO UPDATE SET name= excluded.name,
title= excluded.title
""")
engine.execute(query)
确保您的 df 列的顺序必须与您的表格相同。
编辑 1:
感谢 Gord Thompson 的评论,我意识到如果列中有单引号,此查询将不起作用。因此,如果列中有单引号,这是一个修复:
import pandas as pd
from sqlalchemy import create_engine, text
df.name = df.name.str.replace("'", "''")
df.title = df.title.str.replace("'", "''")
engine = create_engine(postgresql://username:pass@host:port/dbname)
query = text("""
INSERT INTO author(name, title, id)
VALUES %s
ON CONFLICT (id)
DO UPDATE SET name= excluded.name,
title= excluded.title
""" % ','.join([str(i) for i in list(df.to_records(index=False))]).replace('"', "'"))
engine.execute(query)
【讨论】:
name 或title 包含单引号,上述代码将失败。示例here.
name 或title 包含double quotes,代码将失败。 :(
如果您的 DataFrame 和 SQL 表已经包含相同的列名和类型,请考虑使用此函数。 优点:
.
from sqlalchemy import Table
from sqlalchemy.engine.base import Engine as sql_engine
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.automap import automap_base
import pandas as pd
def upsert_database(list_input: pd.DataFrame, engine: sql_engine, table: str, schema: str) -> None:
if len(list_input) == 0:
return None
flattened_input = list_input.to_dict('records')
with engine.connect() as conn:
base = automap_base()
base.prepare(engine, reflect=True, schema=schema)
target_table = Table(table, base.metadata,
autoload=True, autoload_with=engine, schema=schema)
chunks = [flattened_input[i:i + 1000] for i in range(0, len(flattened_input), 1000)]
for chunk in chunks:
stmt = insert(target_table).values(chunk)
update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
conn.execute(stmt.on_conflict_do_update(
constraint=f'{table}_pkey',
set_=update_dict)
)
【讨论】:
如果你已经有一个 pandas 数据框,你可以使用 df.to_sql 直接通过 SQLAlchemy 推送数据
from sqlalchemy import create_engine
#create a connection from Postgre URI
cnxn = create_engine("postgresql+psycopg2://username:password@host:port/database")
#write dataframe to database
df.to_sql("my_table", con=cnxn, schema="myschema")
【讨论】: