【发布时间】:2015-03-01 17:28:01
【问题描述】:
我正在尝试让这个原则发挥作用:
- 一个生产者发送一个消息(1)并等待包含一些结果的ack(实际上是一个操作的json结果)
- 每 5 秒检查所有待处理消息的消费者,并在一行中处理所有消息,并在一行中确认所有消息,然后再次等待 5 秒(无限循环)。
这是我stompproducer.php的30行:
<?php
function msg($txt)
{
echo date('H:i:s > ').$txt."\n";
}
$queue = '/aaaa';
$msg = 'bar';
if (count($argv)<3) {
echo $argv[0]." [msg] [nb to send]\n";
exit(1);
}
$msg = (string)$argv[1];
$to_send = intval($argv[2]);
try {
$stomp = new Stomp('tcp://localhost:61613');
while (--$to_send) {
msg("Sending...");
$result = $stomp->send(
$queue,
$msg." ". date("Y-m-d H:i:s"),
array('receipt' => 'message-123')
);
echo 'result='.var_export($result,true)."\n";
msg("Done.");
}
} catch(StompException $e) {
die('Connection failed: ' . $e->getMessage());
}
这是我stompconsumer.php的30行:
<?php
$queue = '/aaaa';
$_waitTimer=5000000;
$_timeLastAsk = microtime(true);
function msg($txt)
{
echo date('H:i:s > ').$txt."\n";
}
try {
$stomp = new Stomp('tcp://localhost:61613');
$stomp->subscribe($queue, array('activemq.prefetchSize' => 40));
$stomp->setReadTimeout(0, 10000);
while (true) {
$frames_read=array();
while ($stomp->hasFrame()) {
$frame = $stomp->readFrame();
if ($frame != null) {
array_push($frames_read, $frame);
}
if (count($frames_read)==40) {
break;
}
}
msg("Nombre de frames lues : ".count($frames_read));
msg("Pause...");
$e=$_waitTimer-(microtime(true)-$_timeLastAsk);
if ($e>0) {
usleep($e);
}
if (count($frames_read)>0) {
msg("Ack now...");
foreach ($frames_read as $frame) {
$stomp->ack($frame);
}
}
$_timeLastAsk = microtime(true);
}
} catch(StompException $e) {
die('Connection failed: ' . $e->getMessage());
}
我无法做到同步生产者,即等待消费者确认的生产者。如果您运行我在此处完成的示例,您会看到生产者 立即 发送所有消息,然后退出,并在调用 $stomp->send() 时返回所有“true”(如“ok”)结果。
我仍然没有找到好的示例,也没有找到带有简单阻塞示例的好的文档。
我应该怎么做才能让我的生产者阻塞直到消费者发送它的确认?
注意:我已阅读所有文档 here 以及 stackoverflow here 和 here 上的 stomp php 问题。
【问题讨论】:
-
为什么要让生产者等到消费者确认消息?在任何时候,消息都会由一个人负责,因此如果您确保不会将消息从生产者丢失到代理,您可以让生产者和消费者按照自己的节奏运行。生产者需要等待消费者确认这一点似乎与消息队列的使用背道而驰。你有什么强烈的理由来满足你的要求吗?