【问题标题】:Aws Sqs Consumer - Poll only when messages can be processed immediatelyAws Sqs Consumer - 仅在可以立即处理消息时进行轮询
【发布时间】:2021-01-23 23:07:23
【问题描述】:

我正在尝试创建一个 AWS SQS windows 服务使用者,它将以 10 个批次轮询消息。每条消息都将在其自己的任务中执行以进行并行执行。消息处理包括调用不同的 api 和发送电子邮件,因此可能需要一些时间。

我的问题是,首先,我只想在可以立即处理 10 条消息时轮询队列。这是由于 sqs 可见性超时,并且接收到的消息“等待”可能会超过可见性超时并“返回”队列。这将产生重复。我不认为调整可见性超时是好的,因为仍然有可能重复消息,这就是我试图避免的。其次,我想对并行度进行某种限制(例如,最大限制为 100 个并发任务),这样就可以保留服务器资源,因为服务器中还运行着其他应用程序。

如何做到这一点?或者有没有其他方法可以解决这些问题?

【问题讨论】:

    标签: c# concurrency task-parallel-library amazon-sqs


    【解决方案1】:

    这个答案做了以下假设:

    1. 应序列化从 AWS 获取消息。只有消息的处理应该是并行的。
    2. 应处理从 AWS 获取的每条消息。在所有获取的消息有机会处理之前,不应终止整个执行。
    3. 应等待每个消息处理操作。整个执行不应在所有已启动任务完成之前终止。
    4. 应忽略处理消息期间发生的任何错误。整个执行不应因为单个消息的处理失败而终止。
    5. 在从 AWS 获取消息期间发生的任何错误都应该是致命的。整个执行应该终止,但不是在所有当前正在运行的消息处理操作都完成之前。
    6. 执行机制应该能够处理从 AWS 提取操作返回的批处理消息数量与请求数量不同的情况。

    以下是(希望)满足这些要求的实现:

    /// <summary>
    /// Starts an execution loop that fetches batches of messages sequentially,
    /// and process them one by one in parallel.
    /// </summary>
    public static async Task ExecutionLoopAsync<TMessage>(
        Func<int, Task<TMessage[]>> fetchMessagesAsync,
        Func<TMessage, Task> processMessageAsync,
        int fetchCount,
        int maxDegreeOfParallelism,
        CancellationToken cancellationToken = default)
    {
        // Arguments validation omitted
        var semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
    
        // Count how many times we have acquired the semaphore, so that we know
        // how many more times we have to acquire it before we exit from this method.
        int acquiredCount = 0;
        try
        {
            while (true)
            {
                Debug.Assert(acquiredCount == 0);
                for (int i = 0; i < fetchCount; i++)
                {
                    await semaphore.WaitAsync(cancellationToken);
                    acquiredCount++;
                }
    
                TMessage[] messages = await fetchMessagesAsync(fetchCount)
                    ?? Array.Empty<TMessage>();
    
                for (int i = 0; i < messages.Length; i++)
                {
                    if (i >= fetchCount) // We got more messages than we asked for
                    {
                        await semaphore.WaitAsync();
                        acquiredCount++;
                    }
                    ProcessAndRelease(messages[i]);
                    acquiredCount--;
                }
    
                if (messages.Length < fetchCount)
                {
                    // We got less messages than we asked for
                    semaphore.Release(fetchCount - messages.Length);
                    acquiredCount -= fetchCount - messages.Length;
                }
    
                // This method is 'async void' because it is not expected to throw ever
                async void ProcessAndRelease(TMessage message)
                {
                    try { await processMessageAsync(message); }
                    catch { } // Swallow exceptions
                    finally { semaphore.Release(); }
                }
            }
        }
        catch (SemaphoreFullException)
        {
            // Guard against the (unlikely) scenario that the counting logic is flawed.
            // The counter is no longer reliable, so skip the awaiting in finally.
            acquiredCount = maxDegreeOfParallelism;
            throw;
        }
        finally
        {
            // Wait for all pending operations to complete. This could cause a deadlock
            // in case the counter has become out of sync.
            for (int i = acquiredCount; i < maxDegreeOfParallelism; i++)
                await semaphore.WaitAsync();
        }
    }
    

    使用示例:

    var cts = new CancellationTokenSource();
    
    Task executionTask = ExecutionLoopAsync<Message>(async count =>
    {
        return await GetBatchFromAwsAsync(count);
    }, async message =>
    {
        await ProcessMessageAsync(message);
    }, fetchCount: 10, maxDegreeOfParallelism: 100, cts.Token);
    

    【讨论】:

    • 感谢@Theodor 抽出宝贵时间来理解和回答问题。你的假设是正确的。对于 #5,当从 aws 获取时发生异常时。如果我捕捉到异常并只返回 0 条消息,它会起作用吗?
    • @eyese7en 是的,如果fetchMessagesAsync 返回了一个空数组,那么循环中就没有更多的事情要做了,fetchMessagesAsync 将立即被再次调用(没有任何延迟)。您可以考虑通过添加额外的TimeSpan fetchMessagesRetryDelay 参数来增强ExecutionLoopAsync 机制,并将await fetchMessagesAsync 行包含在包含指定持续时间的异步延迟的内部重试循环中。
    • @@Theodor 天真地,Thread.Sleep 会延迟工作吗?如果没问题,我会在几天内完全实施后将其标记为答案。
    • @eyese7en 是的,Thread.Sleep 可以,但await Task.Delay() 更好。这也有一个CancellationToken 参数,因此您可以将cancellationToken 作为参数传递,并确保取消请求尽可能响应。
    猜你喜欢
    • 2019-04-26
    • 2018-03-05
    • 1970-01-01
    • 2019-01-30
    • 1970-01-01
    • 2019-10-24
    • 1970-01-01
    • 2015-12-04
    • 1970-01-01
    相关资源
    最近更新 更多