【发布时间】:2021-12-11 15:34:22
【问题描述】:
我想使用 ruby redis 客户端订阅频道。订阅成功后,我想使用perform_later 启动 ActiveJob(使用 Sidekiq)作业。这个 Job 然后会做一些工作并产生一个结果,该结果将发布到channel Redis 频道。
这会产生以下错误:
Processing by Api::LocationController#yyy as */*
Parameters: {"zip_code"=>"503400"}
[ActiveJob] Failed enqueuing XXXJob to Sidekiq(default): NoMethodError (undefined method `call_pipeline' for #<Redis::SubscribedClient:0x00007fd9222cdf78 @client=#<Redis::Client:0x00007fd921e1b890 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}, @reconnect=true, @logger=nil, @connection=#<Redis::Connection::Ruby:0x00007fd921df08c0 @sock=#<Redis::Connection::TCPSocket:fd 26>>, @command_map={}, @pending_reads=-3, @connector=#<Redis::Client::Connector:0x00007fd921e1a2d8 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>5.0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}>, @pid=54943>>)
Completed 500 Internal Server Error in 26ms (Allocations: 2374)
NoMethodError (undefined method `call_pipeline' for #<Redis::SubscribedClient:0x00007fd9222cdf78 @client=#<Redis::Client:0x00007fd921e1b890 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>5.0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}, @reconnect=true, @logger=nil, @connection=#<Redis::Connection::Ruby:0x00007fd921df08c0 @sock=nil>, @command_map={}, @pending_reads=-3, @connector=#<Redis::Client::Connector:0x00007fd921e1a2d8 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>5.0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}>, @pid=54943>>):
app/controllers/api/location_controller.rb:10:in `block (3 levels) in yyy'
app/controllers/api/location_controller.rb:8:in `block in yyy'
app/controllers/api/location_controller.rb:5:in `yyy'
yyy 函数是在 GET 请求上调用的控制器的一部分:
def yyy
REDIS_POOL.with { |r|
data = r.get("key")
if data.nil?
r.subscribe("channel") { |on|
on.subscribe {
XXXJob.perform_later(params)
}
on.message { |_, message|
puts message
render json: message
r.unsubscribe("channel")
}
}
else
render json: data
end
}
end
在我看来,Redis subscribe 块中丢失了一些上下文,这对于 perform_later 函数很重要。如果我将XXXJob.perform_later(params) 调用移到r.subscribe("channel") 调用上方并因此移到块之外,则相同的代码可以工作。但是,我想确保该作业仅在订阅 Redis 频道成功后才开始。有什么方法可以实现吗?
【问题讨论】:
标签: ruby-on-rails ruby redis sidekiq rails-activejob