【问题标题】:Read messages from WebSphere in destructive manner以破坏性方式从 WebSphere 读取消息
【发布时间】:2018-05-12 06:54:44
【问题描述】:

我正在尝试从 WebSphere 执行破坏性读取,即我读取了应该立即从队列中删除的消息。

我编写的代码运行良好,直到我开始弄乱消息。例如,最后一个是“添加一条消息,读取它,等待空队列,然后添加两条消息”。在我的场景中,这个程序应该读取第一条消息,等到出现某些内容,然后再读取它。

但是,问题是我遇到了被卡住的情况。我在队列中有一条消息,但如果使用 BROWSE 或 CURSOR,我无法阅读。这是我的代码:

MQEnvironment.UserId = _queueSettings.UserName;
MQEnvironment.Password = _queueSettings.Password;


var manager = new MQQueueManager(_queueSettings.QueueManagerName, _queueSettings.ChannelName, _queueSettings.ConnectionName);
var queue = manager.AccessQueue(_queueSettings.QueueName, MQC.MQOO_FAIL_IF_QUIESCING | MQC.MQOO_INPUT_SHARED | MQC.MQOO_BROWSE);

var browseFirstOptions = new MQGetMessageOptions { Options = MQC.MQGMO_BROWSE_FIRST };
var cursorOptions = new MQGetMessageOptions { Options = MQC.MQGMO_MSG_UNDER_CURSOR };

var currentOptions = browseFirstOptions;

while (!cancellationToken.IsCancellationRequested)
{
    var logger = _contextlessLogger.ForContext("requestId", Guid.NewGuid());
    try
    {
        var msg = new MQMessage();
        queue.Get(msg, currentOptions);

        if (currentOptions == browseFirstOptions)
        {
            currentOptions = cursorOptions;
            continue;
        }

        string messageText = msg.ReadString(msg.MessageLength);
        RunProcessingTask(logger, messageText);
    }
    catch (MQException ex) when (IsNoMessagesException(ex) && currentOptions != browseFirstOptions)
    {
        currentOptions = browseFirstOptions;
    }
    catch (MQException ex) when (IsNoMessagesException(ex))
    {
        const int sleepIntervalMs = 5000;
        _contextlessLogger.Information("No messages in the queue. Sleeping for {sleepIntervalMs}ms", sleepIntervalMs);
        await Task.Delay(sleepIntervalMs);
    }
    catch (Exception ex)
    {
        logger.Error(ex, "Unexpected error occured");
    }
}

private static bool IsNoMessagesException(MQException exception) =>
    exception.ReasonCode == MQC.MQRC_NO_MSG_AVAILABLE
    || exception.ReasonCode == MQC.MQRC_NO_MSG_UNDER_CURSOR;

我只收到20332034,而我可以在 UI 中看到有一条消息。

怎么可能呢?也许我做错了?


我也添加了 Java 标记,因为它们的代码完全没有区别,例如我使用 this code 作为参考。

【问题讨论】:

  • 为什么要浏览然后获取?在您的逻辑中,它会将每条消息记录两次?我认为最好使用 SYNCPOINT,记录它,然后提交。对于@DanielSteinmann 的回答,运行DIS QSTATUS(NAME.OF.QUEUE) UNCOM 并查看UNCOMNO 还是一个数字,如果它是一个数字,则消息未提交,因此您将无法获取它。请注意,不推荐使用 MQEnvironment,因为它不是线程安全的,最好使用属性哈希表。
  • @JoshMc 因为文档说当我第一次访问队列时,它的逻辑位置是-1,而不是在第一条消息上。这就是我先浏览队列的原因:因为如果必须将光标位置设置为0,然后定期读取队列。
  • 您可以在没有浏览的情况下进行获取,它将获得队列中的“第一条”消息。 first 的含义将取决于队列本身的消息顺序传递设置。默认为 PRIORITY,这意味着将根据优先级传递事物,优先级较高的消息首先传递(相同优先级的消息将按 FIFO 顺序传递)。如果设置为 FIFO,则所有消息都按 FIFO 顺序传递,消息优先级被忽略。
  • 看看你的MQ安装下的以下目录:<MQ Base install folder>\tools\dotnet\samples\cs\base\SimpleGet\SimpleGet.cs,这是一个不使用浏览的简单get示例。
  • tools\dotnet\samples\cs\base\SimpleXAGet\SimpleXAGet.cs 是一个使用 .Net 事务并在同步点下获取消息的示例。

标签: java c# .net queue ibm-mq


【解决方案1】:

虽然我可以在 UI 中看到有一条消息。

我假设您看到当前队列深度为 1。

如果您将消息放入队列,IBM MQ 会立即增加当前队列深度,即在您提交事务之前。但是您只能在事务提交之后 收到消息。

【讨论】:

  • 嗯。谢谢你的想法,如果我每次清空队列时调用AccessQueue,这段代码就可以正常工作。没有更好的方法吗,你知道吗?
  • 我猜你必须使用MQGMO_BROWSE_NEXT 进行后续调用。但正如@JoshMc 指出的那样,我会在事务中收到消息,并在成功处理任务完成后提交事务。
猜你喜欢
  • 2012-11-05
  • 1970-01-01
  • 2011-03-28
  • 2016-10-22
  • 2013-10-22
  • 1970-01-01
  • 1970-01-01
  • 2013-09-13
  • 2014-04-19
相关资源
最近更新 更多