【问题标题】:Flask-SQLALchemy update record automatically after specific timeFlask-SQLALchemy 在特定时间后自动更新记录
【发布时间】:2019-12-09 02:04:56
【问题描述】:

我有一个这样的数据库模型:

class Payment(db.Model):
    id = db.Column(db.Integer(), primary_key=True)
    user_id = db.Column(db.Integer(), db.ForeignKey('user.id'))
    ticket_status = db.Column(db.Enum(TicketStatus, name='ticket_status', default=TicketStatus.UNUSED))
    departure_time = db.Column(db.Date)

datetime.utcnow() 传递来自departure_time 的日期值之后,我想更改所有ticket_status 的值。

我尝试这样编写代码:

class TicketStatus(enum.Enum):
    UNUSED = 'UNUSED'
    USED = 'USED'
    EXPIRED = 'EXPIRED'

    def __repr__(self):
        return str(self.value)


class Payment(db.Model):
    id = db.Column(db.Integer(), primary_key=True)
    user_id = db.Column(db.Integer(), db.ForeignKey('user.id'))
    ticket_status = db.Column(db.Enum(TicketStatus, name='ticket_status', default=TicketStatus.UNUSED))
    departure_time = db.Column(db.Date)

    # TODO | set ticket expirations time
    def __init__(self):
        if datetime.utcnow() > self.departure_time:
            self.ticket_status = TicketStatus.EXPIRED.value
        try:
            db.session.add(self)
            db.session.commit()
        except Exception as e:
            db.session.rollback()

我也试过这样:

def ticket_expiration(self, payment_id):
    now = datetime.utcnow().strftime('%Y-%m-%d')
    payment = Payment.query.filter_by(id=payment_id).first()
    if payment.ticket_status.value == TicketStatus.USED.value:
        pass
    elif payment and str(payment.departure_time) < now:
        payment.ticket_status = TicketStatus.EXPIRED.value
    elif payment and str(payment.departure_time) >= now:
        payment.ticket_status = TicketStatus.UNUSED.value
    try:
        db.session.commit()
    except Exception as e:
        db.session.rollback()
    return str('ok')

但是当datetime.utcnow() 传递来自departure_time 的日期值时似乎没有效果。

所以我的问题的重点是,如何在一组时间后自动更改一行的值..?

【问题讨论】:

    标签: python flask scheduled-tasks flask-sqlalchemy scheduler


    【解决方案1】:

    您可以将您的状态列替换为“已使用”列,该列将包含布尔值并为状态创建一个混合属性。 https://docs.sqlalchemy.org/en/13/orm/extensions/hybrid.html

    class Payment(db.Model):
        id = db.Column(db.Integer(), primary_key=True)
        user_id = db.Column(db.Integer(), db.ForeignKey('user.id'))
        used = db.Column(db.Boolean(), default=False)
        departure_time = db.Column(db.Date)
    
        @hybrid_property
        def status(self):
            if datetime.utcnow() > self.departure_time:
                return "EXPIRED"
            elif self.used:
                return "USED"
            return "UNUSED"
    

    【讨论】:

    • 更改 if 的优先级以根据您的逻辑优化获取属性,这可能会很好
    【解决方案2】:

    最后我通过使用 flask_apscheduler 解决了这个问题,这是我解决这个问题的代码的 sn-p:

    安装flask_apscheduler

    pip3 install flask_apscheduler
    

    创建新模块tasks.py

    from datetime import datetime
    
    from flask_apscheduler import APScheduler
    
    from app import db
    from app.models import Payment, TicketStatus
    
    scheduler = APScheduler()
    
    
    def ticket_expiration():
        utc_now = datetime.utcnow().strftime('%Y-%m-%d')
        app = scheduler.app
        with app.app_context():
            payment = Payment.query.all()
            for data in payment:
                try:
                    if data.ticket_status.value == TicketStatus.USED.value:
                        pass
                    elif str(data.departure_time) < utc_now:
                        data.ticket_status = TicketStatus.EXPIRED.value
                    elif str(data.departure_time) >= utc_now:
                        data.ticket_status = TicketStatus.UNUSED.value
                except Exception as e:
                    print(str(e))
                try:
                    db.session.commit()
                except Exception as e:
                    db.session.rollback()
        return str('ok')
    

    然后在 __init__.py

    中将包注册到烧瓶应用中
    def create_app(config_class=Config):
        app = Flask(__name__)
        app.config.from_object(Config)
        # The other packages...
        # The other packages...
        scheduler.init_app(app)
        scheduler.start()
    
        return app
    
    # import from other_module...
    # To avoid SQLAlchemy circular import, do the import at the bottom.
    from app.tasks import scheduler 
    

    这里是 config.py

    class Config(object):
        # The others config...
        # The others config...
    
        # Flask-apscheduler
        JOBS = [
            {
                'id': 'ticket_expiration',
                'func': 'app.tasks:ticket_expiration',
                'trigger': 'interval',
                'hours': 1, # call the task function every 1 hours
                'replace_existing': True
            }
        ]
        SCHEDULER_JOBSTORES = {
            'default': SQLAlchemyJobStore(url='sqlite:///flask_context.db')
        }
        SCHEDULER_API_ENABLED = True
    

    在上面的配置中,我们可以根据我们的情况调用函数每隔1小时,秒或其他时间更新db,更多设置间隔时间的信息可以看到here

    我希望这个答案对将来面临这个问题的人有所帮助。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-07-13
      • 2016-10-16
      • 1970-01-01
      • 1970-01-01
      • 2020-09-08
      • 2021-06-29
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多