【问题标题】:Flask Celery Python import烧瓶芹菜 Python 导入
【发布时间】:2017-02-03 19:41:05
【问题描述】:

在我的 Flask 应用程序中集成 Celery 时遇到问题。 这是回购https://github.com/theobouwman/community-python

我通过运行 app.py 来启动我的应用程序,它会导入我的应用程序(其中添加了蓝图和配置)和 Celery。

/tasks/add.py 中,我有一个示例任务,并在其中再次为@celery.task 装饰器导入Celery 对象。

到那时一切正常。我可以运行我的 Flask 应用程序并运行 Celery worker。

但是当我尝试从蓝图中的控制器内触发任务时,就像这里 https://github.com/theobouwman/community-python/blob/master/auth/controllers/RegistrationController.py#L38 它说它无法导入它,这是一个逻辑反应。

Traceback (most recent call last):
  File "app.py", line 2, in <module>
    from flask_app import app
  File "/development/projects/python/Community/flask_app.py", line 4, in <module>
    from auth.routes import auth
  File "/development/projects/python/Community/auth/routes.py", line 3, in <module>
    from .controllers import RegistrationController, AuthenticationController, LogoutController
  File "/development/projects/python/Community/auth/controllers/RegistrationController.py", line 10, in <module>
    from tasks.add import add
  File "/development/projects/python/Community/tasks/add.py", line 1, in <module>
    from app import celery
  File "/development/projects/python/Community/app.py", line 2, in <module>
    from flask_app import app
ImportError: cannot import name 'app'

我不知道如何解决这个导入周期,这就是这个问题的原因。我用谷歌搜索了 3 个小时,但找不到解决方案。 我希望这里有人可以帮助我。

空气中有 Flask Slack 或 Gitter 吗?

提前致谢。

【问题讨论】:

标签: python flask celery python-import


【解决方案1】:

RegistrationController.py中的导入更改为本地,以解决循环导入:


    from ..blueprint import auth
    from models import User
    from flask import redirect, url_for, request, render_template, flash
    import bcrypt
    from ..forms.register import SimpleRegistrationForm
    """
    Error in python3.6 app.py
    Says cyclus import error
    """
    # Comment out the line below
    # from tasks.add import add


    @auth.route('/register', methods=['GET', 'POST'])
    def register():
        form = SimpleRegistrationForm(request.form)
        if request.method == 'POST' and form.validate():
            fname = request.form['fname']
            sname = request.form['sname']
            email = request.form['email']
            password = request.form['password']
            hashed = bcrypt.hashpw(password.encode('utf-8 '), bcrypt.gensalt())

            user = User.select().where(User.email == email)
            if user.exists():
                flash('Er bestaat al een account met dit email adres')
                return redirect(url_for('auth.register'))

            user = User(fname=fname, sname=sname, email=email, password=hashed)
            user.save()

            flash('Uw account is aangemaakt. Kijk in uw mailbox voor de activatie link')
            return redirect(url_for('auth.register'))
        return render_template('pages/register.html', form=form)


    @auth.route('/register/test')
    def register_test():
        # local import avoids the cycle
        from tasks.add import add
        add.delay()
        # hashed = bcrypt.hashpw('test'.encode('utf-8 '), bcrypt.gensalt())
        # user = User(
        #     fname='Theo',
        #     sname='Bouwman',
        #     email='theobouwman98@gmail.com',
        #     password=hashed
        # )
        # user.save()
    return redirect(url_for('auth.login'))

【讨论】:

  • 我以为这样导入模块是违反python的导入规则的吧?
  • 如果这个回答对你有帮助,请点赞并采纳!
  • 请不要犹豫
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-12-16
  • 2012-08-22
  • 1970-01-01
  • 1970-01-01
  • 2018-08-22
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多