【问题标题】:How to make a simple task queue如何制作一个简单的任务队列
【发布时间】:2013-07-12 10:04:03
【问题描述】:

我希望使用 RabbitMQ 和名为 AMQP 的 PHP 的 PECL 扩展创建一个简单的任务队列。

我的目标很简单: 生产者应该将消息发送到包含需要处理的对象的信封的特定队列。

消费者都应该听取所述队列并在消息到来时对其进行处理。 我需要能够添加更多消费者并让 RabbitMq 以循环方式发送消息。

虽然很容易找到 Python 或 Java 库的教程,但我找不到 PHP 的 PECL 库的教程。

我不太确定是否应该绑定任何东西,我有一个使用自定义 php 库的工作示例,该库使用“basic_publish 和 basic_consume”,PECL 库中没有以这种方式实现。

所以这是我到目前为止得到的: 发布者:

$oConfig = Zend_Registry::get('config');
$sQueue = $oConfig->amqp->validate_queue_name;

$oConnection = new AMQPConnection();
$oConnection->setLogin($oConfig->amqp->login);
$oConnection->setPassword($oConfig->amqp->pass);
$oConnection->setVhost($oConfig->amqp->vhost);
$oConnection->setPort($oConfig->amqp->port);
$oConnection->connect();

$oChannel = new AMQPChannel($oConnection);
$oExchange = new AMQPExchange($oChannel);

$sMsg = new stdClass();
$sMsg->nId = $p_nId;
$sMsg->nStatus= $p_nStatus;
try  {
  $oChannel->startTransaction();
  $bResponse = $oExchange->publish($sMgs,$sQueue);
  if (!$bResponse)  {
    echo "<h1>An error occured, the message can't be published</h1>";
    echo "<h3>Sorry i don't know why</h3>";
    exit;
  }
  $oChannel->commitTransaction();
}  catch (Exception $oException)  {
  echo "<h1>An error occured, the message can't be published</h1>";
  echo "<h3>See error below</h3>";
  echo "<pre>";
  echo print_r($oException->getMessage());
  echo "</pre>";
  exit;
}

工人

  $oConfig = Zend_Registry::get('config');
  $oConnection = new AMQPConnection();
  $oConnection->setLogin($oConfig->amqp->login);
  $oConnection->setPassword($oConfig->amqp->pass);
  $oConnection->setVhost($oConfig->amqp->vhost);
  $oConnection->setPort($oConfig->amqp->port);
  $oConnection->connect();

  $oChannel = new AMQPChannel($oConnection);
  $oQueue = new AMQPQueue($oChannel);

  $oQueue->declare($oConfig->amqp->validate_queue_name);

  function processMessage($oMessage, $oQueue) {
    $nId     = $msg->body->nId;
    $nStatus = $msg->body->nStatus;
    $oIniAct = $oActionMap->findBy('id',$nId);

    $sReply  = $oIniAct->updateStatusMisc($nStatus);
    if ($sReply->status == $nStatus)  {
      $oQueue->ack($sMsg['delivery_tag']);
    } else {
    $oQueue->nack($sMsg['delivery_tag'],AMQP_REQUEUE);
    }
  }

  $oQueue->consume("processMessage",AMQP_NOPARAM);

PHP 文档告诉我,consume() 会为每个人锁定相应的线程?所以基本上我一次只能有一名工作人员? 我还看到人们正在绑定队列,但是我看到的第一个具有基本消费​​的工人示例没有使用它。

如您所见,我很困惑,任何帮助/方向/教程 ASO...都会有所帮助

谢谢

【问题讨论】:

    标签: php rabbitmq amqp


    【解决方案1】:

    PHP 具有同步特性,所以是的,consume() 将锁定主线程,而底层逻辑是读取套接字连接上的所有传入数据,将其转换为 PHP 结构并提供给您的消费者函数。

    在 github 上讨论了使 php-amqp 异步的问题,但我们都同意,如果有人需要异步功能,那么 PHP 在设计上并不是最好的语言。

    我个人多次运行消费者脚本(事实上我有平衡器),所以每个消费者不会相互影响,他们可能会失败并独立重启。我认为你也可以这样做。

    我一次多次运行消费者脚本作为守护程序和平衡器脚本(实际上没有真正的负载平衡器),它监控消费者活动(通过 memcache 完成,不清楚,但是 WFM)以及当消费者平衡器上没有活动时杀死他们一个接一个(但至少有一个工作的消费者应该还活着)。当消费者过载时,平衡器脚本启动更多消费者。

    如果你需要消费一条消息然后死掉,让你的消费者函数返回false

    如果您确定队列至少有一条消息可用,您可能希望使用AMQPQueue::get() 方法,该方法不会阻塞(或至少不应该)您的主线程。

    【讨论】:

    • 什么意思?您在负载均衡器后面运行消费者脚本?我不确定我是否理解。基本上,如果我只出现两次该工作代码,它就行不通了?你是怎么做到的,你让你的工人只消费一条消息并立即死亡?这一切我都不清楚:s
    • 更新了我的答案。也许 get() 会满足您的需求?
    • 实际上我认为我可以阻止我的主线程。我想我理解多个消费者将能够以并行的方式工作。基本上这是我的一个误解,你和一堆阅读澄清了。谢谢!
    猜你喜欢
    • 2011-05-05
    • 2023-04-07
    • 1970-01-01
    • 1970-01-01
    • 2021-08-15
    • 2021-08-11
    • 1970-01-01
    • 1970-01-01
    • 2011-01-29
    相关资源
    最近更新 更多