【问题标题】:How can I inspect a queue in ActiveJobs?如何检查 ActiveJobs 中的队列?
【发布时间】:2017-10-17 22:46:58
【问题描述】:

在将作业排入队列之前,我想检查队列并查看队列中是否已经存在具有完全相同参数的作业,在这种情况下不要将作业排入队列。但我不知道我应该如何做到这一点。可能吗?

我知道我可以使用 TestHelper 在我的测试中轻松做到这一点。 TestHelper 依赖于我们当然不会在生产环境中使用的 TestAdapter。

多一点背景。在我们的 API 中,我们检索每个请求中的客户端版本号。我们使用 Intercom 提供支持,并希望在 Intercom 中显示应用程序版本,以便我们可以看到客户在解决支持问题时使用的版本。但是为了限制对 Intercom 的呼叫次数,我将每次发布到 Intercom 的时间延迟了几分钟,当一个帖子入队时,我不想让具有相同数据的新帖子入队。

我的问题与 List queued tasks with ActiveJob AsyncAdapter 有关,但该问题仅涉及排队工作的数量。

Efficiently reschedule ActiveJob (resque/sidekiq) 表示这是不可能的,我需要单独实施解决方案。

我是否可以使用 ActiveJobs 以某种方式检查队列及其中的作业,或者我是否需要跟踪我已排队的内容和已执行的内容?

【问题讨论】:

  • 我不知道ActiveJob对此的具体解决方案,但我知道你可以通过Sidekiq本身来做到这一点(如果你正在使用Sidekiq?)。如果这是一个可以接受的解决方案,那么我会写下我的答案。
  • @Jay-ArPolidario 是的,我正在使用 Sidekiq,但想要一个通用的解决方案,以便可以使用任何 QueueAdapter。建造起来不会很困难。使用作业类和参数的哈希存储对象的 fifo。然后设置一个推送到 fifo 的 after_enqueue 和一个从中提取的 before_perform。然后可以在执行之前检查fifo是否包含计划排队的内容。
  • 哦,我明白了,不幸的是,我在 ActiveJob 文档中找不到任何支持此功能的内容。因此,除非您想直接从 redis 队列本身获取队列详细信息,例如我的回答,或者执行您刚才所说的操作,并将对作业的引用存储在 fifo 存储中(可能在内存中?)。如果您只有一个 Sidekiq 工作进程在运行,那么您可以将它们存储在您刚才所说的 fifo(内存中)中,或者如果您有多个工作进程,那么您可能希望将这些作业引用存储到数据库中。

标签: ruby-on-rails rails-activejob


【解决方案1】:

以下是未完成的实现。它目前只支持Sidekiq(我假设你使用的是Sidekiq)。

注意:仅对此进行了部分测试,因此您可能需要更新以下一些内容。如果有时间,我会在稍后全面检查。

class ActiveJob::Base
  def self.where(jid: nil, arguments: [])
    found_jobs = []

    job_adapter = Rails.application.config.active_job.queue_adapter

    case job_adapter

    when :sidekiq
      queue = Sidekiq::Queue.new(queue_name)

      if jid.present?
        job = queue.find_job(jid)

        if job
          # if arguments supplied, check also if args match
          if arguments.present?
            should_disregard_job_class = self == ActiveJob::Base
            job_has_same_class = self.to_s == job.args[0]['job_class']
            job_has_same_arguments = arguments == job.args[0]['arguments'][3..-1]

            if (should_disregard_job_class || job_has_same_class) && job_has_same_arguments
              found_jobs << job
            end

          else
            found_jobs << job
          end
        end

      else
        # TODO: optimise below as it is slow
        queue.each do |job|
          should_disregard_job_class = self == ActiveJob::Base
          job_has_same_class = self.to_s == job.args[0]['job_class']
          job_has_same_arguments = arguments == job.args[0]['arguments'][3..-1]

          if (should_disregard_job_class || job_has_same_class) && job_has_same_arguments
            found_jobs << job
          end 
        end
      end

    when :resque
      # TODO
    else
      raise "TODO: missing Adapter implementation for #{job_adapter}"
    end

    found_jobs
  end
end

用法

jobs = ActiveJob::Base.where(jid: '12345678abcdef')
jobs = MyCustomJob.where(jid: '12345678abcdef')
jobs = MyCustomJob.where(arguments: ['firstjobargumentvalue', 'secondjobargumentvalue'])

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-12-19
    • 1970-01-01
    • 2020-09-15
    • 2013-05-17
    相关资源
    最近更新 更多