【问题标题】:Using python async / await with django restframework将 python async / await 与 django restframework 一起使用
【发布时间】:2017-10-18 22:08:07
【问题描述】:

我只是将一个旧项目升级到 Python 3.6,并发现有这些很酷的新 async / await 关键字。

我的项目包含一个网络爬虫,目前性能不是很好,大约需要 7 分钟才能完成。 现在,由于我已经有了 django restframework 来访问我的 django 应用程序的数据,我认为最好有一个 REST 端点,我可以通过一个简单的 POST 请求从远程启动爬虫。

但是,我不希望客户端同步等待爬虫完成。我只想直接给他发爬虫已经启动的消息,在后台启动爬虫。

from rest_framework import status
from rest_framework.decorators import api_view
from rest_framework.response import Response
from django.conf import settings
from mycrawler import tasks

async def update_all_async(deep_crawl=True, season=settings.CURRENT_SEASON, log_to_db=True):
    await tasks.update_all(deep_crawl, season, log_to_db)


@api_view(['POST', 'GET'])
def start(request):
    """
    Start crawling.
    """
    if request.method == 'POST':
        print("Crawler: start {}".format(request))

        deep = request.data.get('deep', False)
        season = request.data.get('season', settings.CURRENT_SEASON)

        # this should be called async
        update_all_async(season=season, deep_crawl=deep)

        return Response({"Success": {"crawl finished"}}, status=status.HTTP_200_OK)
    else:
        return Response ({"description": "Start the crawler by calling this enpoint via post.", "allowed_parameters": {
            "deep": "boolean",
            "season": "number"
        }}, status.HTTP_200_OK)

我已经阅读了一些教程,以及如何使用循环和东西,但我真的不明白......在这种情况下我应该从哪里开始循环?

[编辑] 2017 年 10 月 20 日:

我现在使用线程解决了它,因为它确实是一个“即发即弃”的任务。但是,我仍然想知道如何使用 async / await 来实现同样的目的。

这是我目前的解决方案:

import threading


@api_view(['POST', 'GET'])
def start(request):
    ...
    t = threading.Thread(target=tasks.update_all, args=(deep, season))
    t.start()
    ...

【问题讨论】:

    标签: django python-3.x asynchronous


    【解决方案1】:

    这在 Django 3.1+ 中是可能的,在 introducing asynchronous support 之后。

    关于异步运行循环,您可以通过使用uvicorn 或任何其他ASGI 服务器而不是gunicorn 或其他WSGI 服务器运行Django 来使用它。 不同之处在于,在使用 ASGI 服务器时,已经有一个运行循环,而在使用 WSGI 时则需要创建一个。使用 ASGI,您可以直接在 views.py 或其 View Classes 的继承函数下定义 async 函数。

    假设您使用 ASGI,您有多种方法可以实现这一点,我将描述其中的几种(例如,其他选项可以使用 asyncio.Queue):

    1. 使start()异步

    通过使start()异步,您可以直接使用现有的运行循环,通过使用asyncio.Task,您可以触发并忘记到现有的运行循环。如果你想开火但要记住,你可以创建另一个Task 来跟进这个,即:

    from rest_framework import status
    from rest_framework.decorators import api_view
    from rest_framework.response import Response
    from django.conf import settings
    from mycrawler import tasks
    
    import asyncio
    
    async def update_all_async(deep_crawl=True, season=settings.CURRENT_SEASON, log_to_db=True):
        await tasks.update_all(deep_crawl, season, log_to_db)
    
    async def follow_up_task(task: asyncio.Task):
        await asyncio.sleep(5) # Or any other reasonable number, or a finite loop...
        if task.done():
            print('update_all task completed: {}'.format(task.result()))
        else:
            print('task not completed after 5 seconds, aborting')
            task.cancel()
    
    
    @api_view(['POST', 'GET'])
    async def start(request):
        """
        Start crawling.
        """
        if request.method == 'POST':
            print("Crawler: start {}".format(request))
    
            deep = request.data.get('deep', False)
            season = request.data.get('season', settings.CURRENT_SEASON)
    
            # Once the task is created, it will begin running in parallel
            loop = asyncio.get_running_loop()
            task = loop.create_task(update_all_async(season=season, deep_crawl=deep))
    
            # Fire up a task to track previous down
            loop.create_task(follow_up_task(task))
    
            return Response({"Success": {"crawl finished"}}, status=status.HTTP_200_OK)
        else:
            return Response ({"description": "Start the crawler by calling this enpoint via post.", "allowed_parameters": {
                "deep": "boolean",
                "season": "number"
            }}, status.HTTP_200_OK)
    
    1. async_to_sync

    有时您不能只使用async 函数将请求首先路由到as it happens with DRF(截至今天)。 为此,Django 提供了一些有用的async adapter functions,但请注意,从同步上下文切换到异步上下文或反之亦然,a small performance penalty 大约需要 1 毫秒。请注意,这一次,在 update_all_sync 函数中收集的运行循环改为:

    from rest_framework import status
    from rest_framework.decorators import api_view
    from rest_framework.response import Response
    from django.conf import settings
    from mycrawler import tasks
    
    import asyncio
    from asgiref.sync import async_to_sync
    
    @async_to_sync
    async def update_all_async(deep_crawl=True, season=settings.CURRENT_SEASON, log_to_db=True):
        #We can use the running loop here in this use case
        loop = asyncio.get_running_loop()
        task = loop.create_task(tasks.update_all(deep_crawl, season, log_to_db))
        loop.create_task(follow_up_task(task))
    
    async def follow_up_task(task: asyncio.Task):
        await asyncio.sleep(5) # Or any other reasonable number, or a finite loop...
        if task.done():
            print('update_all task completed: {}'.format(task.result()))
        else:
            print('task not completed after 5 seconds, aborting')
            task.cancel()
    
    
    @api_view(['POST', 'GET'])
    def start(request):
        """
        Start crawling.
        """
        if request.method == 'POST':
            print("Crawler: start {}".format(request))
    
            deep = request.data.get('deep', False)
            season = request.data.get('season', settings.CURRENT_SEASON)
    
            # Make update all "sync"
            sync_update_all_sync = async_to_sync(update_all_async)
            sync_update_all_sync(season=season, deep_crawl=deep)
    
            return Response({"Success": {"crawl finished"}}, status=status.HTTP_200_OK)
        else:
            return Response ({"description": "Start the crawler by calling this enpoint via post.", "allowed_parameters": {
                "deep": "boolean",
                "season": "number"
            }}, status.HTTP_200_OK)
    

    在这两种情况下,函数都会快速返回 200,但从技术上讲,第二个选项更慢。

    重要提示:在使用 Django 时,这些异步操作通常会涉及数据库操作。至少目前,Django 中的数据库操作只能是同步的,因此您必须在异步上下文中考虑这一点。 sync_to_async() 在这些情况下变得非常方便。

    【讨论】:

    • 第一个示例中的路径操作函数 (start) 不会等待任何内容,因此它实际上会阻塞事件循环。
    • create_task() 将保证任务执行,无论是否等待(例如,参见this answer)。您当然可以在follow_up_task() 中等待该实例,或者您可以像使用Task.done()Task.result() 的示例一样检查其状态。因为如果你不await,你就不会阻止...试试我创建的this example 来说明它。 (需要 python-3.7+)
    • 感谢您抽出宝贵时间回答这个老问题!我不再真正维护我遇到此问题的项目,但您的回答似乎可以解决我的问题。
    【解决方案2】:

    在我看来,你应该看看celery,这是一个专门为异步任务设计的好工具。它支持 Django,当您不希望用户在服务器上等待长时间操作时,它非常有用。在后台运行的每个任务都会收到一个 task_id,如果您想创建另一个服务,它可以帮助您,给定一个 task_id,返回特定任务是否成功或没有,或者到目前为止已经完成了多少。

    【讨论】:

    • 我以前用过 celery,但对于我的用例来说,这似乎有点矫枉过正 :) 也许我还是会尝试一下
    • 在我看来,Celery 是完成这项工作的正确工具。
    猜你喜欢
    • 2017-11-05
    • 2019-04-15
    • 2014-06-19
    • 2018-01-14
    • 2018-09-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多