【问题标题】:Daemon-kit process one amqp job at a timeDaemon-kit 一次处理一个 amqp 作业
【发布时间】:2015-12-12 15:13:20
【问题描述】:

我们使用 daemon-kit 创建了一个 amqp 工作者,它应该接收一份工作,然后要求一份新工作,但不是在第一份工作完成之前。问题是,如果 RabbitMQ 队列中有一个,Daemon Kit 会分叉该作业并立即启动一个新作业。

在 daemon-kit 中是否有一种正式的方式来强制执行一次一项工作?或者我们怎样才能做到这一点?

这是我们如何启动 amqp 工作器和处理作业的简短版本。当作业完成并得到结果时,它会将其发布回 RabbitMQ 服务器。

# Run an event-loop for processing
DaemonKit::AMQP.run do |connection|

  connection.on_tcp_connection_loss do |client, settings|
    DaemonKit.logger.debug("AMQP connection status changed: #{client.status}")
    client.reconnect(false, 1)
  end

  amq = AMQP::Channel.new

  amq.queue(engine_key).subscribe do |metadata,msg|

    msg_decode = JSON.parse(msg)

    job = REFxEngineRunnerAPI10.new msg_decode
    result = job.run(metadata.correlation_id)

    amq.queue( metadata.reply_to, :auto_delete => false)

    xc = amq.default_exchange
    xc.publish JSON.dump(result), :routing_key => metadata.reply_to, :correlation_id => metadata.correlation_id
  end
end

更新

我发现这对我们有用:

DaemonKit::AMQP.run do |connection|

  amq = AMQP::Channel.new(connection, prefetch: 1)
  # I needs this extra line because I use RabbitMQ new than version 2.3.6
  amq.qos(0, 1) 

  # be sure to add (:ack => true)
  amq.queue(engine_key).subscribe(:ack => true) do |metadata,msg|

    #### run long job one at a time

    # Tell RabbitMQ I finished the job and I can now receive a new job
    metadata.ack

  end
end

【问题讨论】:

    标签: ruby rabbitmq daemon amqp


    【解决方案1】:

    我在这里暗中尝试,因为这对我来说听起来正是协议的行为方式。但是,您可以使用 QoSprefetching 来限制从代理向下发送到订阅者的消息数量,使用如下:

    amq = AMQP::Channel.new(connection, prefetch: 1)

    根据the example,这应该会给你你想要的行为。

    【讨论】:

      猜你喜欢
      • 2022-09-23
      • 2019-05-12
      • 1970-01-01
      • 2016-04-16
      • 2013-09-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-10-31
      相关资源
      最近更新 更多