【问题标题】:How best to keep a job queue clean of retry/duplicate jobs (using sidekiq and redis-semaphore)如何最好地保持作业队列中没有重试/重复作业(使用 sidekiq 和 redis-semaphore)
【发布时间】:2013-04-29 02:13:27
【问题描述】:

我有一个 Rails 应用程序,它可以从多个 IMAP 帐户中获取大量电子邮件。

  • 我使用 sidekiq 来处理这些工作。
  • 我使用 sidetiq 来安排作业。
  • 我使用 redis-semaphore 来确保同一用户的重复性工作不会相互干扰。

虽然有 2 个问题:

  • 1:当作业点击“if s.lock”时,redis-semaphore 将其暂停,直到所有之前的作业完成。我需要取消作业而不是排队。
  • 2:如果在作业期间引发异常,导致崩溃,sidekiq 会将作业放回队列中重试。我需要取消作业而不是排队。将“sidekiq_options :retry => false”放入代码中似乎没有什么不同。

我的代码:

class FetchMailsJobs
  include Sidekiq::Worker
  include Sidetiq::Schedulable

  tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) }

  def perform(last_occurrence, current_occurrence)
    users = User.all
    users.each do |user|

      if user.imap_accounts.exists?
        ImapJob.perform_async(user._id.to_s)
      end
    end
  end
end

class ImapJob
  include Sidekiq::Worker

  def perform(user_id)
    s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, connection: "localhost")
    if s.lock
      user = User.where(_id: user_id).first
      emails = ImapMails.receive_mails(user)
      s.unlock
    end
  end
end

【问题讨论】:

    标签: ruby redis mutex semaphore sidekiq


    【解决方案1】:

    1。创建一个Redis 子类并重载blpop 以接受-1 以实现lpop 的非阻塞使用。

    redis-semaphoreRedis::Semaphore#lock 中调用@redis.blpop。虽然您可以重载 lock 方法以改用 @redis.lpop,但更简单的方法是将 Redis 的自定义实例传递给信号量。

    将以下内容放在您的 rails 应用程序的 lib 中,并在您的 config/initializers/sidekiq.rb 中要求它(或执行任何您喜欢的加载以下类的操作)。

    class NonBlockingRedis < Redis
      def blpop(key, timeout)
        if timeout == -1
          result = lpop(key)
          return result if result.nil?
          return [key, result]
        else
          super(key, timeout)
        end
      end
    end
    

    每当您调用 Redis::Semaphore.new 时,传递一个带有 NonBlockingRedis 类的新实例的 :redis 键。

    使用-1 作为参数调用s.lock 以使用lpop 而不是blpop

    s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost"))
    if s.lock -1
      user = User.where(_id: user_id).first
      emails = ImapMails.receive_mails(user)
      s.unlock
    end
    

    2。在您的工作人员类中使用 sidekiq_options retry: false 应该可以工作,请参见下面的示例。

    在您的问题中,您没有指定您遇到问题的工作人员最终进入重试队列。由于FetchMailsJobs 最终将ImapJob 工作排入队列,因此前者中的异常可能会导致看起来ImapJob 正在重新排队。

    使用信号量锁,将您的工作包装在 begin rescue ensure 块中也是一个好主意。

    class FetchMailsJobs
      include Sidekiq::Worker
      include Sidetiq::Schedulable
    
      sidekiq_options retry: false
    
      tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) }
    
      def perform(last_occurrence, current_occurrence)
        users = User.all
        users.each do |user|
    
          if user.imap_accounts.exists?
            ImapJob.perform_async(user._id.to_s)
          end
        end
      end
    end
    
    class ImapJob
      include Sidekiq::Worker
    
      sidekiq_options retry: false
    
      def perform(user_id)
        s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost"))
        if s.lock - 1
          begin
            user = User.where(_id: user_id).first
            emails = ImapMails.receive_mails(user)
          rescue => e
            # ignore; do nothing
          ensure
            s.unlock
          end
        end
      end
    end
    

    更多信息请参见sidekiq Advanced Options: workers

    【讨论】:

      【解决方案2】:

      难道不能放弃使用 redis-semaphore 并使用 sidekiq-unique-jobs gem 吗?它似乎是一个包含精美的工具,可以完全满足您的需求。

      【讨论】:

        猜你喜欢
        • 2020-12-13
        • 2020-11-17
        • 1970-01-01
        • 1970-01-01
        • 2013-06-16
        • 2017-09-23
        • 2021-02-16
        • 2020-08-30
        • 1970-01-01
        相关资源
        最近更新 更多