【问题标题】:Non-blocking transactions when reading from ActiveMQ queue with STOMP使用 STOMP 从 ActiveMQ 队列读取时的非阻塞事务
【发布时间】:2013-02-27 19:48:55
【问题描述】:

我正在通过 STOMP 与 ActiveMQ 交互。我有一个发布消息的进程和一个订阅和处理消息的多个进程(大约 10 个并行实例)。

阅读消息后,我想确定如果由于某种原因我的应用程序失败/崩溃,消息不会丢失。所以很自然地,我转向交易。不幸的是,我发现一旦消费者在事务中读取了一条消息,在事务结束之前,以下所有消息都不会发送给其他消费者。

测试用例:abc 队列有 100 条消息。如果我在两个不同的浏览器选项卡中激活以下代码,第一个将在 10 秒内返回,第二个将在 20 秒内返回。

<?php
// Reader.php
$con = new Stomp("tcp://localhost:61613");
$con->connect();

$con->subscribe(
    "/queue/abc",
    array()
);

$tx = "tx3".microtime();
echo "TX:$tx<BR>";
$con->begin($tx);
$messages = array();
for ($i = 0; $i < 10; $i++) {
    $t = microtime(true);
    $msg = $con->readFrame();
    if (!$msg) {
        die("FAILED!");
    }
    $t = microtime(true)-$t; echo "readFrame() took $t MS to complete<BR>";
    array_push($messages, $msg);
    $con->ack($msg, $tx);
    sleep(1);
}
$con->abort($tx);

我在代码方面有什么遗漏吗?有没有办法配置 ActiveMQ(或发送标头),使事务从队列中删除项目,允许其他进程消费其他消息,如果事务失败或超时,将项目放回?

PS:我考虑过为每个阅读过程创建另一个队列 - DetentionQueue,但如果可以选择,我真的不想这样做。

【问题讨论】:

    标签: php transactions activemq nonblocking stomp


    【解决方案1】:

    您可能需要调整订阅的预取大小,以便 ActiveMQ 在客户端 2 有机会获得任何消息之前不会将队列上的消息发送到客户端 1。默认情况下,它设置为 1000,因此最好根据您的用例对其进行调整。

    您可以通过订阅框架上的“activemq.prefetchSize=1”标头设置预取大小。有关所有框架选项,请参阅 ActiveMQ Stomp 页面。

    【讨论】:

    • 我的特定 ActiveMQ 版本默认预取大小为 1(针对 AvctiveMQ 控制台检查)。可以肯定的是,我检查了您的建议(使用 activemq.prefetchSize=1),但它并没有改变结果。不过还是谢谢你:)
    猜你喜欢
    • 2014-10-16
    • 2019-02-08
    • 1970-01-01
    • 2012-12-08
    • 1970-01-01
    • 2019-02-12
    • 1970-01-01
    • 1970-01-01
    • 2016-09-16
    相关资源
    最近更新 更多