【发布时间】:2011-07-28 18:00:52
【问题描述】:
我有一个 django 应用程序,它在查询集上调用异步任务(使用 celery)。该任务获取查询集并执行一大堆操作,根据其中的对象,这些操作可能需要很长时间。对象可以跨查询集共享,因此用户可以在包含已经运行的对象的查询集上提交任务,并且新任务应该只在尚未运行的对象上执行,但要等待所有对象完成在它返回之前。
我的解释有点混乱,想象一下下面的代码:
from time import sleep
import redis
from celery.task import Task
from someapp.models import InterestingModel
from someapp.longtime import i_take_a_while
class LongRunningTask(Task):
def run(self, process_id, *args, **kwargs):
_queryset = InterestingModel.objects.filter(process__id=process_id)
r = redis.Redis()
p = r.pipeline()
run_check_sets = ('run_check', 'objects_already_running')
# There must be a better way to do this:
for o in _queryset.values_list('pk', flat=True):
p.sadd('run_check')
p.sdiff(run_check_sets) # Objects that need to be run
p.sunion(run_check_sets) # Objects that we need to wait for
p.sunionstore('objects_already_running',run_check_sets)
p.delete('run_check')
redis_result = p.execute()
objects_to_run = redis_result[-3]
objects_to_wait_for = redis_result[-2]
if objects_to_run:
i_take_a_while(objects_to_run)
p = r.pipeline()
for o in objects_to_run:
p.srem('objects_already_running', o)
p.execute()
while objects_to_wait_for:
p = r.pipeline()
for o in objects_to_wait_for:
p.sismember('objects_already_running',o)
redis_result = p.execute()
objects_to_wait_for = [objects_to_wait_for[i] for i, member in enumerate(redis_result) if member]
# Probably need to add some sort of timeout here or in redis
sleep(30)
我对 Redis 非常陌生,所以我的主要问题是是否有更有效的方法来操纵 Redis 以达到相同的结果。更广泛地说,我想知道 Redis 是否是必要的/处理这个问题的正确方法。似乎应该有更好的方法来将 Django 模型与 Redis 交互。最后,我想知道这段代码是否实际上是线程安全的。谁能在我的逻辑上打洞?
欢迎评论。
【问题讨论】:
标签: python django thread-safety redis celery