这可能不是最优雅的解决方案,但它似乎对我有用:
engine = db.create_engine(sqlalchemy_uri)
Base = declarative_base()
class Word(Base):
__tablename__ = "so64359277"
id = db.Column(db.Integer, primary_key=True)
word = db.Column(db.String(100))
def __repr__(self):
return f"<Word(id={self.id}, word='{self.word}')>"
Base.metadata.drop_all(engine, checkfirst=True)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
# test data
word_objects = []
for x in [
"Hotel",
"Charlie",
"Alfa",
"India",
"Foxtrot",
"Echo",
"Bravo",
"Golf",
"Delta",
]:
word_objects.append(Word(word=x))
session.add_all(word_objects)
session.commit()
# show test data with id values
pprint(session.query(Word).all())
"""console output:
[<Word(id=1, word='Hotel')>,
<Word(id=2, word='Charlie')>,
<Word(id=3, word='Alfa')>,
<Word(id=4, word='India')>,
<Word(id=5, word='Foxtrot')>,
<Word(id=6, word='Echo')>,
<Word(id=7, word='Bravo')>,
<Word(id=8, word='Golf')>,
<Word(id=9, word='Delta')>]
"""
target_word = "Echo"
num_context_rows = 3
rowlist = session.query(
Word.id,
Word.word,
db.func.row_number().over(order_by=Word.word).label("rownum"),
).cte("rowlist")
target_rownum = session.query(rowlist.c.rownum).filter(
rowlist.c.word == target_word
)
select_subset = session.query(rowlist.c.rownum, rowlist.c.id).filter(
db.and_(
(rowlist.c.rownum >= target_rownum.scalar() - num_context_rows),
(rowlist.c.rownum <= target_rownum.scalar() + num_context_rows),
)
)
rownum_id_map = {x[0]: x[1] for x in select_subset.all()}
min_rownum = min(rownum_id_map)
max_rownum = max(rownum_id_map)
result = []
for rownum in range(min_rownum, max_rownum + 1):
result.append(session.query(Word).get(rownum_id_map[rownum]))
pprint(result)
"""console output:
[<Word(id=7, word='Bravo')>,
<Word(id=2, word='Charlie')>,
<Word(id=9, word='Delta')>,
<Word(id=6, word='Echo')>,
<Word(id=5, word='Foxtrot')>,
<Word(id=8, word='Golf')>]
<Word(id=1, word='Hotel')>]
"""