【问题标题】:thread safety in django with asynchronous tasks and redisdjango中的线程安全与异步任务和redis
【发布时间】:2011-07-28 18:00:52
【问题描述】:

我有一个 django 应用程序,它在查询集上调用异步任务(使用 celery)。该任务获取查询集并执行一大堆操作,根据其中的对象,这些操作可能需要很长时间。对象可以跨查询集共享,因此用户可以在包含已经运行的对象的查询集上提交任务,并且新任务应该只在尚未运行的对象上执行,但要等待所有对象完成在它返回之前。

我的解释有点混乱,想象一下下面的代码:

from time import sleep
import redis
from celery.task import Task
from someapp.models import InterestingModel
from someapp.longtime import i_take_a_while

class LongRunningTask(Task):
    def run(self, process_id, *args, **kwargs):
        _queryset = InterestingModel.objects.filter(process__id=process_id)

        r = redis.Redis()
        p = r.pipeline()
        run_check_sets = ('run_check', 'objects_already_running')

        # There must be a better way to do this:
        for o in _queryset.values_list('pk', flat=True):
            p.sadd('run_check')
        p.sdiff(run_check_sets) # Objects that need to be run
        p.sunion(run_check_sets) # Objects that we need to wait for
        p.sunionstore('objects_already_running',run_check_sets)
        p.delete('run_check')
        redis_result = p.execute()

        objects_to_run = redis_result[-3]
        objects_to_wait_for = redis_result[-2]

        if objects_to_run:
            i_take_a_while(objects_to_run)
            p = r.pipeline()
            for o in objects_to_run:
                p.srem('objects_already_running', o)
            p.execute()

        while objects_to_wait_for:
            p = r.pipeline()
            for o in objects_to_wait_for:
                p.sismember('objects_already_running',o)
            redis_result = p.execute()
            objects_to_wait_for = [objects_to_wait_for[i] for i, member in enumerate(redis_result) if member]
            # Probably need to add some sort of timeout here or in redis
            sleep(30) 

我对 Redis 非常陌生,所以我的主要问题是是否有更有效的方法来操纵 Redis 以达到相同的结果。更广泛地说,我想知道 Redis 是否是必要的/处理这个问题的正确方法。似乎应该有更好的方法来将 Django 模型与 Redis 交互。最后,我想知道这段代码是否实际上是线程安全的。谁能在我的逻辑上打洞?

欢迎评论。

【问题讨论】:

    标签: python django thread-safety redis celery


    【解决方案1】:

    您是否有可能以稍微不同的方式构建它?具体来说,我会启动每个对象的任务,然后将有关您长时间运行的作业的信息存储在某处(例如,数据库、缓存等)。当每个单独的对象完成后,它会更新长时间运行的作业信息并检查是否所有作业都已返回。如果是这样,那么您可以在长时间运行的任务完成后运行任何需要运行的代码。

    这样做的好处是在您等待其他事情发生时不会占用服务器上的线程。在客户端,您可以定期检查长时间运行的作业的状态,甚至可以根据需要使用完成的对象数来更新进度表。

    【讨论】:

    • 我实际上就是这么做的!
    猜你喜欢
    • 2015-09-14
    • 2013-08-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-11-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多