【问题标题】:Postgres 9.5 upsert command in pandas or psycopg2?熊猫或 psycopg2 中的 Postgres 9.5 upsert 命令?
【发布时间】:2017-07-30 10:35:12
【问题描述】:

我看到的大多数示例是人们使用 ON CONFLICT DO UPDATE 语法将单行插入数据库。

有人有使用 SQLAlchemy 或 pandas.to_sql 的示例吗?

我 99% 的插入使用 psycopg2 COPY 命令(所以我保存了 csv 或 stringio,然后进行批量插入),另外 1% 使用 pd.to_sql。我检查新行或新维度的所有逻辑都是在 Python 中完成的。

def find_new_rows(existing, current, id_col):
        current[id_col] = current[id_col].astype(int)
        x = existing[['datetime', id_col, 'key1']]
        y = current[['datetime', id_col, 'key2']]
        final = pd.merge(y, x, how='left', on=['datetime', id_col])
        final = final[~(final['key2'] == final['key1'])]
        final = final.drop(['key1'], axis=1)
        current = pd.merge(current, final, how='left', on=['datetime', id_col])
        current = current.loc[current['key2_y'] == 1]
        current.drop(['key2_x', 'key2_y'], axis=1, inplace=True)
        return current

谁能给我看一个使用 pyscopg2 进行 upsert 的新 PostgreSQL 语法的示例?一个常见的用例是检查维度变化(每天在 50k - 100k 行之间,我将其与现有值进行比较),这是 CONFLICT DO NOTHING 只添加新行。

另一个用例是我有随时间变化的事实数据。我只取最近的值(我目前使用视图来选择不同的),但如果可能的话,最好使用 UPSERT。

【问题讨论】:

    标签: pandas psycopg2 postgresql-9.5


    【解决方案1】:

    仅供参考,这是我目前使用的解决方案。

    对于我的目的来说,它似乎工作得很好。我不得不添加一行来用 None 替换 null (NaT) 时间戳,因为当我将每一行加载到数据库中时出现错误。

    def create_update_query(table):
        """This function creates an upsert query which replaces existing data based on primary key conflicts"""
        columns = ', '.join([f'{col}' for col in DATABASE_COLUMNS])
        constraint = ', '.join([f'{col}' for col in PRIMARY_KEY])
        placeholder = ', '.join([f'%({col})s' for col in DATABASE_COLUMNS])
        updates = ', '.join([f'{col} = EXCLUDED.{col}' for col in DATABASE_COLUMNS])
        query = f"""INSERT INTO {table} ({columns}) 
                    VALUES ({placeholder}) 
                    ON CONFLICT ({constraint}) 
                    DO UPDATE SET {updates};"""
        query.split()
        query = ' '.join(query.split())
        return query
    
    
    def load_updates(df, table, connection):
        conn = connection.get_conn()
        cursor = conn.cursor()
        df1 = df.where((pd.notnull(df)), None)
        insert_values = df1.to_dict(orient='records')
        for row in insert_values:
            cursor.execute(create_update_query(table=table), row)
            conn.commit()
        row_count = len(insert_values)
        logging.info(f'Inserted {row_count} rows.')
        cursor.close()
        del cursor
        conn.close()
    

    【讨论】:

    • 请问f'%({col})s'是什么意思
    • 在列表理解中使用 Python 格式字符串和 col 变量。因此,变量列是从我在表中的所有列的列表中生成的。它采用这些值并添加一个逗号(column1、column2、column3...)。最后,该函数生成一个长查询,指定要替换哪些列,哪些列是唯一的,等等。
    • 你可以在 python (3.6+) 中试试这个: DATABASE_COLUMNS = ['column1', 'column2', 'column3'], columns = ', '.join([f'{col}' for col in DATABASE_COLUMNS]), query = f"""INSERT INTO {table} ({columns})""", print(query)
    【解决方案2】:

    这是我在 pandas 数据帧中对 postgresql 的冲突更新查询进行批量插入和插入的代码:

    假设 id 是 postgresql 表和 pandas df 的唯一键,您想根据此 id 插入和更新。

    import pandas as pd
    from sqlalchemy import create_engine, text
    
    engine = create_engine(postgresql://username:pass@host:port/dbname)
    query = text(f""" 
                    INSERT INTO schema.table(name, title, id)
                    VALUES {','.join([str(i) for i in list(df.to_records(index=False))])}
                    ON CONFLICT (id)
                    DO  UPDATE SET name= excluded.name,
                                   title= excluded.title
             """)
    engine.execute(query)
    

    确保您的 df 列的顺序必须与您的表格相同。

    【讨论】:

      猜你喜欢
      • 2016-10-01
      • 1970-01-01
      • 1970-01-01
      • 2017-11-22
      • 1970-01-01
      • 1970-01-01
      • 2016-04-18
      • 2020-01-23
      • 2016-03-13
      相关资源
      最近更新 更多