【问题标题】:Activemq and Php Stomp: synchronous producer sampleActivemq 和 Php Stomp:同步生产者示例
【发布时间】:2015-03-01 17:28:01
【问题描述】:

我正在尝试让这个原则发挥作用:

  • 一个生产者发送一个消息(1)并等待包含一些结果的ack(实际上是一个操作的json结果)
  • 每 5 秒检查所有待处理消息的消费者,并在一行中处理所有消息,并在一行中确认所有消息,然后再次等待 5 秒(无限循环)。

这是我stompproducer.php的30行:

<?php

function msg($txt)
{
    echo date('H:i:s > ').$txt."\n";
}

$queue  = '/aaaa';
$msg    = 'bar';
if (count($argv)<3) {
    echo $argv[0]." [msg] [nb to send]\n";
    exit(1);
}
$msg     = (string)$argv[1];
$to_send = intval($argv[2]);

try {
    $stomp = new Stomp('tcp://localhost:61613');
    while (--$to_send) {
        msg("Sending...");
        $result = $stomp->send(
            $queue,
            $msg." ". date("Y-m-d H:i:s"),
            array('receipt' => 'message-123')
        );
        echo 'result='.var_export($result,true)."\n";
        msg("Done.");
    }
} catch(StompException $e) {
    die('Connection failed: ' . $e->getMessage());
}

这是我stompconsumer.php的30行:

<?php

$queue  = '/aaaa';
$_waitTimer=5000000;
$_timeLastAsk = microtime(true);

function msg($txt)
{
    echo date('H:i:s > ').$txt."\n";
}

try {
    $stomp = new Stomp('tcp://localhost:61613');
    $stomp->subscribe($queue, array('activemq.prefetchSize' => 40));
    $stomp->setReadTimeout(0, 10000);
    while (true) {
        $frames_read=array();
        while ($stomp->hasFrame()) {
            $frame = $stomp->readFrame();
            if ($frame != null) {
                array_push($frames_read, $frame);
            }
            if (count($frames_read)==40) {
                break;
            }
        }
        msg("Nombre de frames lues : ".count($frames_read));
        msg("Pause...");
        $e=$_waitTimer-(microtime(true)-$_timeLastAsk);
        if ($e>0) {
            usleep($e);
        }
        if (count($frames_read)>0) {
            msg("Ack now...");
            foreach ($frames_read as $frame) {
                $stomp->ack($frame);
            }
        }
        $_timeLastAsk = microtime(true);
    }
} catch(StompException $e) {
    die('Connection failed: ' . $e->getMessage());
}

我无法做到同步生产者,即等待消费者确认的生产者。如果您运行我在此处完成的示例,您会看到生产者 立即 发送所有消息,然后退出,并在调用 $stomp-&gt;send() 时返回所有“true”(如“ok”)结果。 我仍然没有找到好的示例,也没有找到带有简单阻塞示例的好的文档。

我应该怎么做才能让我的生产者阻塞直到消费者发送它的确认?

注意:我已阅读所有文档 here 以及 stackoverflow herehere 上的 stomp php 问题。

【问题讨论】:

  • 为什么要让生产者等到消费者确认消息?在任何时候,消息都会由一个人负责,因此如果您确保不会将消息从生产者丢失到代理,您可以让生产者和消费者按照自己的节奏运行。生产者需要等待消费者确认这一点似乎与消息队列的使用背道而驰。你有什么强烈的理由来满足你的要求吗?

标签: php activemq stomp


【解决方案1】:

我才想起来,你可以试试 reactphp/stomp 库。 这是一个事件驱动的库,可能会对您有所帮助。特地看看ad核心功能addPeriodicTimer

https://github.com/reactphp/stomp

干杯

【讨论】:

    【解决方案2】:

    我想到的第一件事:看看这个 stomp 插件:

    http://activemq.apache.org/message-redelivery-and-dlq-handling.html

    我能想到的另一个解决方法是: 在生产者方面: 1. 更改您的生产者以发送持久消息

    在您的消费者方面: 使用计时器。 1.阅读消息/帧,直到空或达到最大上限。 2. 创建一个 CURL 请求并清空打包的消息列表 3. 让你的服务器休眠 5 秒

    您肯定需要进一步测试,但应该可以。进程唤醒后,您应该能够读取所有排队的消息。

    需要考虑的事项: - 持久消息需要一个过期时间 - 您需要消费者方面的 ACK 以确保更新已参加的消息的状态。使用 ACK=client 以便您可以确认所有已确认的消息 - 如果您不必等待 CURL 响应,会更容易。 - 开箱即用,不支持从消费者(服务器端)发送 ACK。

    祝你好运

    【讨论】:

      【解决方案3】:

      从问题看来,您正在寻找请求/响应类型的消息传递模式。这是您必须自己实现的东西,因为您引用的 STOMP ack 只是代表消费者向消息代理确认消息,生产者对此一无所知。请求响应涉及在出站消息上设置回复地址,然后在发送下一条消息之前等待接收该地址上的响应。有很多文章记录了这种事情,例如one

      或者,如果您只需要知道代理是否已收到来自客户端的消息并将其持久化,那么您可以使用 STOMP 内置的receipt 机制让代理向您发送一个收据,表明它已经处理了您发送的消息.然而,这并不能保证消费者已经处理了消息。

      【讨论】:

      • 你是对的,但我不知道如何解决我的问题:“消费者”受到每 5 秒一次 CURL 调用的限制,并且必须能够接受所有“中间” " 来自消费者的请求,“打包”,使用来自所有消费者的所有查询进行 CURL 调用,然后用答案回复他们。
      猜你喜欢
      • 2014-05-31
      • 2010-10-19
      • 1970-01-01
      • 2018-01-07
      • 2015-07-23
      • 1970-01-01
      • 2015-04-06
      • 2013-02-02
      • 2015-01-02
      相关资源
      最近更新 更多