【问题标题】:Handling TimeoutException when receiving messages asynchronously from MessageSession从 MessageSession 异步接收消息时处理 TimeoutException
【发布时间】:2015-06-02 22:54:31
【问题描述】:

目前,我的解决方案涉及 1 个 Web 和 1 个 Worker 角色,通过 Azure 服务总线相互“交谈”:

  • Web 角色将消息发送到“请求”队列。
  • Worker 从“请求”队列中获取消息,执行一些工作并将响应消息放置到“响应”队列中,由 Web 角色从中获取。
  • Web 角色发送的每个请求消息都必须找到您的响应,这就是为什么我用唯一的相关标识符“标记”每个请求消息。 Worker 从请求中获取此标识符并用它“标记”响应。

一开始,我会发送一堆消息:

var messages = tasksForExecution.Select(s =>
            {
                var message = new BrokeredMessage(s);
                message.Properties.Add("action", "submit");
                message.ReplyToSessionId =  Guid.NewGuid().ToString("D");
                return message;
            }).ToList();
var correlationIdentifiers = messages.Select(x => x.ReplyToSessionId).ToList();
messages.ForEach(message => _requestQueueClient.SendAsync(message));

得到响应:

var sessions = correlationIdentifiers
            .Select(x=>_responseQueueClient.AcceptMessageSession(x)).ToList();
var receiveMessageTasks = sessions.Select(session => session.ReceiveAsync()).ToList();
var allReadyTask = Task.WhenAll(receiveMessageTasks).ContinueWith(x =>
            {
                ...extract data from result...
            }
..........
..........
allReadyTask.Wait();

如您所见,我正在创建一堆任务,当所有任务都完成后,调用回调来提取一些数据。但是,如果其中一个任务出现异常怎么办? -> 捕获此异常,并在调用Wait() 时将其提供给我们。由于ReceiveAsync() 方法的默认超时时间是 1 分钟,因此很有可能收到 TimeoutException,因为 worker 可能稍后完成它的工作,并且队列中不会存在此会话的消息。我发现了很多例子,人们捕捉到这种类型的异常,使用它,然后再次尝试接收——他们给了工人一分钟的时间来完成工作。我怎样才能在我的场景中做到这一点。谢谢!

【问题讨论】:

    标签: c# azure multitasking servicebus


    【解决方案1】:

    我正在发布我对该问题的解决方案。请参考上一篇文章了解变量。

    var receiveMessageTasks = sessions.Select(ReceiveMessageAsync).ToList();
    var allReadyTask = Task.WhenAll(receiveMessageTasks).ContinueWith(x =>
            {
                ...extract data from result...
            }
    allReadyTask.Wait();
    

    其中receiveMessageTasks方法如下:

    private async Task<BrokeredMessage> ReceiveMessageAsync(MessageSession session)
        {
            BrokeredMessage message = null;
    
            // We can specify dead line for processing, but since our worker will return error message if a problem arise,
            // I prefer to wait endlessly
            // var timesToRetry = 0;
    
            // if Receive operation throws an exception, then message will be null
            while (message == null /* && timesToRetry < 5*/)
            {
                try
                {
                    var task = session.ReceiveAsync().ConfigureAwait(false);
                    await task;
                    message = task.GetAwaiter().GetResult();
                    //timesToRetry++;
                }
                catch (Exception e)
                {
                    // session was idle for a long time and azure closed it -> renew it
                    session = _responseQueueClient.AcceptMessageSession(session.SessionId);
                }
            }
            return message;
        } 
    

    请注意ConfigureAwait(false)Here你可以找到问题描述:)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-08-02
      • 1970-01-01
      • 2017-02-12
      • 1970-01-01
      • 1970-01-01
      • 2016-08-19
      相关资源
      最近更新 更多