【问题标题】:ActiveJob perform_later from inside Redis subscription block来自 Redis 订阅块内部的 ActiveJob perform_later
【发布时间】:2021-12-11 15:34:22
【问题描述】:

我想使用 ruby​​ redis 客户端订阅频道。订阅成功后,我想使用perform_later 启动 ActiveJob(使用 Sidekiq)作业。这个 Job 然后会做一些工作并产生一个结果,该结果将发布到channel Redis 频道。 这会产生以下错误:

Processing by Api::LocationController#yyy as */*
  Parameters: {"zip_code"=>"503400"}
[ActiveJob] Failed enqueuing XXXJob to Sidekiq(default): NoMethodError (undefined method `call_pipeline' for #<Redis::SubscribedClient:0x00007fd9222cdf78 @client=#<Redis::Client:0x00007fd921e1b890 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}, @reconnect=true, @logger=nil, @connection=#<Redis::Connection::Ruby:0x00007fd921df08c0 @sock=#<Redis::Connection::TCPSocket:fd 26>>, @command_map={}, @pending_reads=-3, @connector=#<Redis::Client::Connector:0x00007fd921e1a2d8 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>5.0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}>, @pid=54943>>)
Completed 500 Internal Server Error in 26ms (Allocations: 2374)


  
NoMethodError (undefined method `call_pipeline' for #<Redis::SubscribedClient:0x00007fd9222cdf78 @client=#<Redis::Client:0x00007fd921e1b890 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>5.0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}, @reconnect=true, @logger=nil, @connection=#<Redis::Connection::Ruby:0x00007fd921df08c0 @sock=nil>, @command_map={}, @pending_reads=-3, @connector=#<Redis::Client::Connector:0x00007fd921e1a2d8 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>5.0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}>, @pid=54943>>):

  
app/controllers/api/location_controller.rb:10:in `block (3 levels) in yyy'
app/controllers/api/location_controller.rb:8:in `block in yyy'
app/controllers/api/location_controller.rb:5:in `yyy'

yyy 函数是在 GET 请求上调用的控制器的一部分:

  def yyy
    REDIS_POOL.with { |r|
      data = r.get("key")
      if data.nil?
        r.subscribe("channel") { |on|
          on.subscribe {
            XXXJob.perform_later(params)
          }
          on.message { |_, message|
            puts message
            render json: message
            r.unsubscribe("channel")
          }
        }
      else
        render json: data
      end
    }
  end

在我看来,Redis subscribe 块中丢失了一些上下文,这对于 perform_later 函数很重要。如果我将XXXJob.perform_later(params) 调用移到r.subscribe("channel") 调用上方并因此移到块之外,则相同的代码可以工作。但是,我想确保该作业仅在订阅 Redis 频道成功后才开始。有什么方法可以实现吗?

【问题讨论】:

    标签: ruby-on-rails ruby redis sidekiq rails-activejob


    【解决方案1】:

    这很难。它不会起作用,因为 Sidekiq 的默认连接池总是重用线程的当前连接,该连接正在处理订阅调用并且该调用不可重入。需要手动创建单独的连接池,直接使用。

    ANOTHER_POOL = ConnectionPool.new { Redis.new }
    
    on.subscribe do
      Sidekiq::Client.via(POOL) do
        SomeWorker.perform_async(1,2,3)
        SomeOtherWorker.perform_async(1,2,3)
      end
    end
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-08-15
      • 2016-05-22
      • 1970-01-01
      相关资源
      最近更新 更多