【问题标题】:Redis still fills up when results_ttl=0, Why?当 results_ttl=0 时,Redis 仍然会填满,为什么?
【发布时间】:2014-10-28 12:54:54
【问题描述】:

问题:如果作业的结果立即丢弃,为什么redis会填满?

我使用 redis 作为队列来异步创建 PDF,然后将结果保存到我的数据库中。由于它已保存,我不需要在以后访问该对象,因此我不需要在处理结果后将结果存储在 Redis 中。

为了防止结果留在redis中,我将TTL设置为0

parameter_dict = {          
    "order": serializer.object,
    "photo": base64_image,
    "result_ttl": 0
}
django_rq.enqueue(procces_template, **parameter_dict)

问题是虽然redis worker说job马上过期了:

15:33:35 Job OK, result = John Doe's nail order to 568 Broadway
15:33:35 Result discarded immediately.
15:33:35
15:33:35 *** Listening on high, default, low...

Redis 还是满了就扔了:

ResponseError: command not allowed when used memory > 'maxmemory'

如果作业结果尚未存储,我是否需要在 redis / django-rq 中设置另一个参数以防止 redis 填满?


更新:

post 之后,我预计内存可能会因为 redis 中的失败作业而被填满。

使用此代码sn-p:

def print_redis_failed_queue():
    q = django_rq.get_failed_queue()
    while True:
        job = q.dequeue()
        if not job:
            break
        print job

这里是 redis 中密钥转储的粘贴箱:

http://pastebin.com/Bc4bRyRR

在这里发帖太实用了。它的大小似乎支持我的理论。但是使用:

def delete_redis_failed_queue():
    q = django_rq.get_failed_queue()
    count = 0
    while True:
        job = q.dequeue()
        if not job:
            print "{} Jobs deleted.".format(count)
            break
        job.delete()
        count += 1  

是否像我期望的那样清除 redis。如何更准确地转储 redis 中的键?我是否正确清除作业?

【问题讨论】:

    标签: python django redis python-rq django-rq


    【解决方案1】:

    事实证明,Redis 是因为孤立的工作而被填满的,即。未分配给特定队列的作业。

    虽然孤儿作业的原因未知,但问题通过这个 sn-p 解决了:

    import redis
    from rq.queue import Queue, get_failed_queue
    from rq.job import Job
    redis = Redis()
    for i, key in enumerate(self.redis.keys('rq:job:*')):
        job_number = key.split("rq:job:")[1]
        job = Job.fetch(job_number, connection=self.redis)
        job.delete()
    

    在我的特殊情况下,调用这个sn-p,(实际上是下面的delete_orphaned_jobs()方法),在每个作业竞争之后确保Redis不会被填满,并且孤儿作业会得到照顾。有关该问题的更多详细信息,请访问已打开的django-rq issue 中的对话链接。

    在诊断此问题的过程中,我还创建了一个utility class,用于轻松检查和删除作业/孤立作业:

    class RedisTools:
        '''
        A set of utility tools for interacting with a redis cache
        '''
    
        def __init__(self):
            self._queues = ["default", "high", "low", "failed"]
            self.get_redis_connection()
    
        def get_redis_connection(self):
            redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
            self.redis = redis.from_url(redis_url)
    
        def get_queues(self):
            return self._queues
    
        def get_queue_count(self, queue):
            return Queue(name=queue, connection=self.redis).count
    
        def msg_print_log(self, msg):
            print msg
            logger.info(msg)
    
        def get_key_count(self):
            return len(self.redis.keys('rq:job:*'))
    
        def get_queue_job_counts(self):
            queues = self.get_queues()
            queue_counts = [self.get_queue_count(queue) for queue in queues]
            return zip(queues, queue_counts)
    
        def has_orphanes(self):
            job_count = sum([count[1] for count in self.get_queue_job_counts()])
            return job_count < self.get_key_count()
    
        def print_failed_jobs(self):
            q = django_rq.get_failed_queue()
            while True:
                job = q.dequeue()
                if not job:
                    break
                print job
    
        def print_job_counts(self):
            for queue in self.get_queue_job_counts():
                print "{:.<20}{}".format(queue[0], queue[1])
            print "{:.<20}{}".format('Redis Keys:', self.get_key_count())
    
        def delete_failed_jobs(self):
            q = django_rq.get_failed_queue()
            count = 0
            while True:
                job = q.dequeue()
                if not job:
                    self.msg_print_log("{} Jobs deleted.".format(count))
                    break
                job.delete()
                count += 1
    
        def delete_orphaned_jobs(self):
            if not self.has_orphanes():
                return self.msg_print_log("No orphan jobs to delete.")
    
            for i, key in enumerate(self.redis.keys('rq:job:*')):
                job_number = key.split("rq:job:")[1]
                job = Job.fetch(job_number, connection=self.redis)
                job.delete()
                self.msg_print_log("[{}] Deleted job {}.".format(i, job_number))
    

    【讨论】:

      【解决方案2】:

      您可以将http://python-rq.org/docs/exceptions/ 中的“黑洞”异常处理程序与job.cancel() 一起使用:

      def black_hole(job, *exc_info):
          # Delete the job hash on redis, otherwise it will stay on the queue forever
          job.cancel()
          return False
      

      【讨论】:

      • 甜,我去看看。
      猜你喜欢
      • 2017-05-12
      • 1970-01-01
      • 1970-01-01
      • 2019-06-10
      • 1970-01-01
      • 2016-10-15
      • 1970-01-01
      • 2022-08-03
      • 1970-01-01
      相关资源
      最近更新 更多