【问题标题】:Upsert statement with Flask-SQLAlchemy使用 Flask-SQLAlchemy 的 Upsert 语句
【发布时间】:2021-11-27 19:45:55
【问题描述】:

我有一个 Flask 应用程序,它解析公共选举数据的 CSV 并将结果插入 Postgres 数据库。它是一个旧的、非 Flask 的 Python 2 应用程序的一个端口,它使用了不再工作的各种库。我主要尝试将应用程序的结构基于this tutorial。我一直在使用 Flask-SQLAlchemy 为数据库表构建一些模型并从 CSV 填充数据。

在这种情况下,我正在使用区域模型,该模型对应于可能进行选举的地理区域(房屋区、学校董事会区等)。以下是我的基本蓝图路线:

election = None

@bp.route('/areas')
def scrape_areas():
    area = Area()
    sources = area.read_sources()
    election = area.set_election()

    if election not in sources:
        return

    # Get metadata about election
    election_meta = sources[election]['meta'] if 'meta' in sources[election] else {}

    for i in sources[election]:
        source = sources[election][i]

        if 'type' in source and source['type'] == 'areas':

            rows = area.parse_election(source, election_meta)
            count = 0

            for row in rows:
                parsed = area.parser(row, i)

                area = Area()
                area.from_dict(parsed, new=True)
                # this shows the generated string of area_id
                # which is a UNIQUE key in the database
                print(area)

                db.session.add(area)
                db.session.commit()
                count = count + 1
            
    return count

这是models.py:

import logging
import os
import json
import re
import csv
import urllib.request

import calendar
import datetime
from flask import current_app
from app import db

LOG = logging.getLogger(__name__)
scraper_sources_inline = None

class ScraperModel(object):

    nonpartisan_parties = ['NP', 'WI', 'N P']

    def __init__(self, group_type = None):
        """
        Constructor
        """

        # this is where scraperwiki was creating and connecting to its database
        # we do this in the imported sql file instead

        self.read_sources()


    def read_sources(self):
        """
        Read the scraper_sources.json file.
        """
        if scraper_sources_inline is not None:
            self.sources = json.loads(scraper_sources_inline)
        else:
            #sources_file = current_app.config['SOURCES_FILE']
            sources_file = os.path.join(current_app.root_path, '../scraper_sources.json')
            data = open(sources_file)
            self.sources = json.load(data)

        return self.sources


    def set_election(self):
        # Get the newest set
        newest = 0
        for s in self.sources:
            newest = int(s) if int(s) > newest else newest

        newest_election = str(newest)
        election = newest_election
        # Usually we just want the newest election but allow for other situations
        election = election if election is not None and election != '' else newest_election
        return election


    def parse_election(self, source, election_meta = {}):

        # Ensure we have a valid parser for this type
        parser_method = getattr(self, "parser", None)
        if callable(parser_method):
            # Check if election has base_url
            source['url'] = election_meta['base_url'] + source['url'] if 'base_url' in election_meta else source['url']

            # Get data from URL
            try:
                response = urllib.request.urlopen(source['url'])
                lines = [l.decode('latin-1') for l in response.readlines()]
                rows = csv.reader(lines, delimiter=';')
                return rows
            except Exception as err:
                LOG.exception('[%s] Error when trying to read URL and parse CSV: %s' % (source['type'], source['url']))
                raise


    def from_dict(self, data, new=False):
        for field in data:
            setattr(self, field, data[field])


class Area(ScraperModel, db.Model):

    __tablename__ = "areas"

    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    area_id = db.Column(db.String(255), unique=True, nullable=False)
    areas_group = db.Column(db.String(255))
    county_id = db.Column(db.String(255))
    county_name = db.Column(db.String(255))
    ward_id = db.Column(db.String(255))
    precinct_id = db.Column(db.String(255))
    precinct_name = db.Column(db.String(255))
    state_senate_id = db.Column(db.String(255))
    state_house_id = db.Column(db.String(255))
    county_commissioner_id = db.Column(db.String(255))
    district_court_id = db.Column(db.String(255))
    soil_water_id = db.Column(db.String(255))
    school_district_id = db.Column(db.String(255))
    school_district_name = db.Column(db.String(255))
    mcd_id = db.Column(db.String(255))
    precincts = db.Column(db.String(255))
    name = db.Column(db.String(255))
    updated = db.Column(db.DateTime, default=db.func.current_timestamp(), onupdate=db.func.current_timestamp())

    def __repr__(self):
        return '<Area {}>'.format(self.area_id)

    def parser(self, row, group):

        # General data
        parsed = {
            'area_id': group + '-',
            'areas_group': group,
            'county_id': None,
            'county_name': None,
            'ward_id': None,
            'precinct_id': None,
            'precinct_name': '',
            'state_senate_id': None,
            'state_house_id': None,
            'county_commissioner_id': None,
            'district_court_id': None,
            'soil_water_id': None,
            'school_district_id': None,
            'school_district_name': '',
            'mcd_id': None,
            'precincts': None,
            'name': ''
        }

        if group == 'municipalities':
            parsed['area_id'] = parsed['area_id'] + row[0] + '-' + row[2]
            parsed['county_id'] = row[0]
            parsed['county_name'] = row[1]
            parsed['mcd_id'] = "{0:05d}".format(int(row[2])) #enforce 5 digit
            parsed['name'] = row[1]

        if group == 'counties':
            parsed['area_id'] = parsed['area_id'] + row[0]
            parsed['county_id'] = row[0]
            parsed['county_name'] = row[1]
            parsed['precincts'] = row[2]

        if group == 'precincts':
            parsed['area_id'] = parsed['area_id'] + row[0] + '-' + row[1]
            parsed['county_id'] = row[0]
            parsed['precinct_id'] = row[1]
            parsed['precinct_name'] = row[2]
            parsed['state_senate_id'] = row[3]
            parsed['state_house_id'] = row[4]
            parsed['county_commissioner_id'] = row[5]
            parsed['district_court_id'] = row[6]
            parsed['soil_water_id'] = row[7]
            parsed['mcd_id'] = row[8]

        if group == 'school_districts':
            parsed['area_id'] = parsed['area_id'] + row[0]
            parsed['school_district_id'] = row[0]
            parsed['school_district_name'] = row[1]
            parsed['county_id'] = row[2]
            parsed['county_name'] = row[3]

        return parsed

因此,Areas 是我的默认模型类的扩展,因为它允许我根据 CSV 设置特定于给定区域的字段。

此代码失败的情况是 CSV 数据中(相对)罕见的情况,在旧应用程序中,可能有多行对应于表中的同一行。那个旧应用程序有一个数组(通常只有一个项目,代表数据库中的唯一列)来指示代码对这些行运行更新。

它返回如下错误:

sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "areas_area_id_key"
DETAIL:  Key (area_id)=(counties-01) already exists.

当我只是从模型中记录我的唯一键值而不是插入它时它如何运行的示例:

<Area precincts-87-0140>
<Area precincts-87-0145>
<Area precincts-87-0150>
<Area precincts-87-0155>
<Area precincts-87-0160>
<Area precincts-87-0165>
<Area school_districts-0001>
<Area school_districts-0001>
<Area school_districts-0001>
<Area school_districts-0002>
<Area school_districts-0004>
<Area school_districts-0006>
<Area school_districts-0012>
<Area school_districts-0013>
<Area school_districts-0014>

所以我一直在研究 Flask 可以用来运行 UPSERT 语句的不同方法,因为我需要更新所有字段,并且它们会根据区域的类型而有所不同,并且也在其他模型中(例如选举竞赛或结果)。我发现的大部分内容都使用 SQLAlchemy 而不是 Flask-SQLAlchemy。

  1. 我发现this answer 看起来很有希望。这是我添加到模型中的内容:
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert

然后我像这样修改了ScraperModel 类:

class ScraperModel(object):

    @compiles(Insert)
    def compile_upsert(insert_stmt, compiler, **kwargs):
        """
        converts every SQL insert to an upsert  i.e;
        INSERT INTO test (foo, bar) VALUES (1, 'a')
        becomes:
        INSERT INTO test (foo, bar) VALUES (1, 'a') ON CONFLICT(foo) DO UPDATE SET (bar = EXCLUDED.bar)
        (assuming foo is a primary key)
        :param insert_stmt: Original insert statement
        :param compiler: SQL Compiler
        :param kwargs: optional arguments
        :return: upsert statement
        """
        pk = insert_stmt.table.primary_key
        insert = compiler.visit_insert(insert_stmt, **kwargs)
        ondup = f'ON CONFLICT ({",".join(c.name for c in pk)}) DO UPDATE SET'
        updates = ', '.join(f"{c.name}=EXCLUDED.{c.name}" for c in insert_stmt.table.columns)
        upsert = ' '.join((insert, ondup, updates))
        return upsert

由于查询的输出方式,我显然误解了 insert_stmt 的工作原理,但这是它生成的错误:

sqlalchemy.exc.ProgrammingError: (psycopg2.errors.SyntaxError) syntax error at or near "ON"
LINE 1: ..., '54', '', CURRENT_TIMESTAMP) RETURNING areas.id ON CONFLIC...
                                                             ^

[SQL: INSERT INTO areas (area_id, areas_group, county_id, county_name, ward_id, precinct_id, precinct_name, state_senate_id, state_house_id, county_commissioner_id, district_court_id, soil_water_id, school_district_id, school_district_name, mcd_id, precincts, name, updated) VALUES (%(area_id)s, %(areas_group)s, %(county_id)s, %(county_name)s, %(ward_id)s, %(precinct_id)s, %(precinct_name)s, %(state_senate_id)s, %(state_house_id)s, %(county_commissioner_id)s, %(district_court_id)s, %(soil_water_id)s, %(school_district_id)s, %(school_district_name)s, %(mcd_id)s, %(precincts)s, %(name)s, CURRENT_TIMESTAMP) RETURNING areas.id ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, area_id=EXCLUDED.area_id, areas_group=EXCLUDED.areas_group, county_id=EXCLUDED.county_id, county_name=EXCLUDED.county_name, ward_id=EXCLUDED.ward_id, precinct_id=EXCLUDED.precinct_id, precinct_name=EXCLUDED.precinct_name, state_senate_id=EXCLUDED.state_senate_id, state_house_id=EXCLUDED.state_house_id, county_commissioner_id=EXCLUDED.county_commissioner_id, district_court_id=EXCLUDED.district_court_id, soil_water_id=EXCLUDED.soil_water_id, school_district_id=EXCLUDED.school_district_id, school_district_name=EXCLUDED.school_district_name, mcd_id=EXCLUDED.mcd_id, precincts=EXCLUDED.precincts, name=EXCLUDED.name, updated=EXCLUDED.updated]
[parameters: {'area_id': 'counties-01', 'areas_group': 'counties', 'county_id': '01', 'county_name': 'Aitkin', 'ward_id': None, 'precinct_id': None, 'precinct_name': '', 'state_senate_id': None, 'state_house_id': None, 'county_commissioner_id': None, 'district_court_id': None, 'soil_water_id': None, 'school_district_id': None, 'school_district_name': '', 'mcd_id': None, 'precincts': '54', 'name': ''}]
(Background on this error at: https://sqlalche.me/e/14/f405)

我希望我没有粘贴太多以帮助那里。

  1. 我还发现了this answer,我将其解读为创建自己的插入语句,而不是编译内置语句。这是我改变的。在蓝图的导入中:
from sqlalchemy.dialects.postgresql import insert

在蓝图的循环中:

for i in sources[election]:
        source = sources[election][i]

        if 'type' in source and source['type'] == 'areas':

            rows = area.parse_election(source, election_meta)

            count = 0

            for row in rows:
                parsed = area.parser(row, i)

                area = Area()
                area.from_dict(parsed, new=True)

                stmt = insert(Area.__table__).values(parsed)
                stmt = stmt.on_conflict_do_update(
                    # Let's use the constraint name which was visible in the original posts error msg
                    constraint="['area_id']",

                    # The columns that should be updated on conflict
                    set_={
                        parsed
                    }
                )
                db.session.execute(stmt)
                count = count + 1

    return count

它会导致不同的错误:

TypeError: unhashable type: 'dict'


说了这么多,我现在很茫然。我很清楚我需要修改 INSERT 语句,但我不清楚我应该采取哪条路线来修改它,如何确保它与正确的字段匹配(称为area_id,关键是称为areas_id_unique),或者如何确保它在找到匹配项时更新正确的字段。

【问题讨论】:

  • 在第二个变体中,parsed 已经是dict,因此您不需要将它用大括号括起来:set_=parsed 应该可以工作。 “我发现的大部分内容都使用 SQLAlchemy 而不是 Flask-SQLAlchemy”:flask-sqlalchemy 是 SQLAlchemy 本身的一个相当薄的包装器:通常您可以将事物引用为 db.sqlalchemy_thing 而不是 sqlalchemy.sqlalchemy_thing,它会起作用。跨度>
  • 使用set_=parsed不行;它有同样的错误。

标签: postgresql flask sqlalchemy flask-sqlalchemy


【解决方案1】:

我认为我发现这些都不起作用,因为我不是在主键上匹配,而是在唯一键上匹配。我所做的是将唯一键 area_id 更改为主键。然后,我可以使用上面的 upsert 语句。

@compiles(Insert)
    def compile_upsert(insert_stmt, compiler, **kwargs):
        """
        converts every SQL insert to an upsert  i.e;
        INSERT INTO test (foo, bar) VALUES (1, 'a')
        becomes:
        INSERT INTO test (foo, bar) VALUES (1, 'a') ON CONFLICT(foo) DO UPDATE SET (bar = EXCLUDED.bar)
        (assuming foo is a primary key)
        :param insert_stmt: Original insert statement
        :param compiler: SQL Compiler
        :param kwargs: optional arguments
        :return: upsert statement
        """
        pk = insert_stmt.table.primary_key
        insert = compiler.visit_insert(insert_stmt, **kwargs)
        ondup = f'ON CONFLICT ({",".join(c.name for c in pk)}) DO UPDATE SET'
        updates = ', '.join(f"{c.name}=EXCLUDED.{c.name}" for c in insert_stmt.table.columns)
        upsert = ' '.join((insert, ondup, updates))
        return upsert

我一直试图更改 pk = insert_stmt.table.primary_key 行以检查唯一键但没有成功,但如果我更改该字段,它的工作原理就是这样。

更改主键也修复了我尝试的其他解决方案:

group = []
for row in rows:
    parsed = area.parser(row, i)

    area = Area()
    area.from_dict(parsed, new=True)

    group.append(area)

insert(db.session, Area, group)


def insert(session, model, rows):
    table = model.__table__
    stmt = insert(table)
    primary_keys = [key.name for key in inspect(table).primary_key]
    update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}

    if not update_dict:
        raise ValueError("insert_or_update resulted in an empty update_dict")

    stmt = stmt.on_conflict_do_update(
        index_elements=primary_keys,
        set_=update_dict
    )

因此,这两种解决方案(相对)都是可行的,但仅使用主键而不是唯一键,而这对我来说还不是很清楚。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-02-01
    • 2018-10-18
    • 2019-04-04
    • 2015-03-16
    • 2010-12-18
    • 2011-11-02
    • 2019-04-21
    相关资源
    最近更新 更多