【问题标题】:Performing concurrent tasks in C#在 C# 中执行并发任务
【发布时间】:2015-09-05 04:51:29
【问题描述】:

我有一项服务需要尽快读取来自 Amazon SQS 的消息。我们预计流量会很大,我希望能够以每秒 10K 条消息的速度阅读。不幸的是,我目前的速度约为 10 条消息/秒。显然,我有工作要做。

这是我正在使用的(转换为控制台应用程序以使测试更容易):

private static int _concurrentRequests;
private static int _maxConcurrentRequests;

public static void Main(string[] args) {
    _concurrentRequests = 0;
    _maxConcurrentRequests = 100;

    var timer = new Timer();
    timer.Elapsed += new ElapsedEventHandler(OnTimedEvent);
    timer.Interval = 10;
    timer.Enabled = true;

    Console.ReadLine();
    timer.Dispose();
}

public static void OnTimedEvent(object s, ElapsedEventArgs e) {
    if (_concurrentRequests < _maxConcurrentRequests) {
        _concurrentRequests++;
        ProcessMessages();
    }
}

public static async Task ProcessMessages() {
    var manager = new MessageManager();
    manager.ProcessMessages();  // this is an async method that reads in the messages from SQS

    _concurrentRequests--;
}

我没有得到接近 100 个并发请求,而且它似乎没有每 10 毫秒触发一次 OnTimedEvent

我不确定Timer 是否是正确的方法。我对这种编码没有太多经验。在这一点上,我愿意尝试任何事情。

更新

多亏了 calebboyd,我离实现目标又近了一点。这是一些非常糟糕的代码:

private static SemaphoreSlim _locker;

public static void Main(string[] args) {
    _manager = new MessageManager();

    RunBatchProcessingForeverAsync();
}
private static async Task RunBatchProcessingForeverAsync() {
    _locker = new SemaphoreSlim(10, 10);
    while (true) {
        Thread thread = new Thread(new ParameterizedThreadStart(Process));
        thread.Start();
    }
}

private static async void Process(object args) {
    _locker.WaitAsync();
    try {
        await _manager.ProcessMessages();
    }
    finally {
        _locker.Release();
    }

}

通过这个,我能够接近每秒读取相当数量的消息,但问题是我的ProcessMessages 呼叫永远不会完成(或者可能会在很长时间后完成)。我在想我可能需要限制我在任何时候运行的线程数。

关于如何改进此代码以便ProcessMessages 有机会完成的任何建议?

【问题讨论】:

  • 定时器有一个限制,最多只能每 15.6 毫秒触发一次。这是 .NET 实现定时器的一个限制。
  • 声称异步的代码没有await——这很奇怪。考虑更新您的代码,使其看起来更真实。
  • 这是一个非常重要的问题,可帮助您确定多线程的可能性 - 如果您在单个线程上连续运行请求,您每秒可以处理多少条消息?
  • 您的主要目标是尽快汇集亚马逊 SQS 吗?
  • @Irving 所以你的数据库是瓶颈——你是否考虑过缓冲消息,并将它们批量插入数据库?

标签: c# multithreading asynchronous parallel-processing amazon-sqs


【解决方案1】:

因为没有等待 MessageManager 对象上的 ProcessMessages 方法,所以我假设它绑定到执行它的同一线程。仅将函数标记为 async 不会将工作传递给新线程。有了这个假设,这段代码实际上并没有用多个线程执行。您可以使用以下代码在更多的线程池中执行您的代码。

管理器对象可能无法处理并发使用。所以我在 Task.Run lambda 中创建它。这也可能很昂贵,因此不切实际。

async Task RunBatchProcessingForeverAsync () {
    var lock = new SemaphoreSlim(initialCount: 10);
    while (true) {
        await lock.WaitAsync();
        Task.Run(() => {
            try {
                var manager = new MessageManager();
                manager.ProcessMessages();
            } finally {
                lock.Release();
            }
        });
    }
}

我已经有一段时间没有编写 C# 了,但这应该会同时、重复、永远运行你的方法 10 次。

【讨论】:

  • 我从未见过这种模式。我喜欢它。
  • 谢谢!我能够在这方面取得一些进展,但我仍然遇到了障碍。我不得不稍微更新您的代码以使其足够快。你能看看我的更新吗?
  • 你应该可以从你的 main 方法调用RunBatchProcessingForeverAsync().Wait()。看起来您可能正在创建无限数量的线程(while 循环中没有await)。您的ProcessMessages 是否返回可以等待的任务?
  • 我也担心我正在创建的线程数量,但不知道如何限制它。我实际上在等待我给ProcessMessages 的电话。如何在 while 循环中添加等待?
  • 非常感谢您的建议。要回答您的问题,是的,ProcessMessages 上的签名如下所示:public async Task&lt;int&gt; ProcessMessages()。有机会我会试试你的建议。
【解决方案2】:

正如@calebboyd 所建议的,您必须首先使您的线程异步。现在,如果你去这里—— Where to use concurrency when calling an API,您会看到一个异步线程足以快速汇集网络资源。如果您能够在一个请求中从亚马逊获取多条消息,那么您的生产者线程(对亚马逊进行异步调用的线程)就可以了——它每秒可以发送数百个请求。这不会是你的瓶颈。但是,处理接收到的数据的延续任务被交给线程池。在这里,您有机会遇到瓶颈 - 假设每秒有 100 个响应到达,每个响应包含 100 条消息(达到您的 10K msgs/sec 近似值)。每秒您有 100 个新任务,每个任务都需要您的线程处理 100 条消息。现在有两种选择:(1) 这些消息的处理不受 CPU 限制 - 您只需将它们发送到您的数据库,或者 (2) 您执行 CPU 消耗计算,例如科学计算、序列化或一些繁重的业务逻辑。如果 (1) 是您的情况,那么瓶颈将向后推向您的数据库。如果 (2),那么您别无选择,只能扩大/缩小或优化计算。但是您的瓶颈可能不是生产线程 - 如果它实施正确(请参阅上面的链接以获取示例)。

【讨论】:

  • 这是我的问题。数据库现在绝对是瓶颈。不过,实现批量插入的想法应该会有很大帮助。感谢您的帮助!
【解决方案3】:

我假设异步方法在线程池中排队,该线程池的线程数与可用处理器的数量一样多。您可能会生成 100 个请求,但它们仍由 8 个线程执行。尝试创建 N 个线程的数组并使用它们。

【讨论】:

  • 调用 ThreadPool.GetMaxThreads 在我的机器上返回 1023 个可用线程。我有一个处理器,有四个内核和 8 个超线程。
  • @Enigmativity 这并不意味着 Robert Hudjakov 完全错误 - 据我了解,在理论上的纯异步程序中,线程池不会创建超过 8 个(在您的情况下)线程来处理所有排队的任务,即使缓冲区已饱和。不过,我可能是错的。
猜你喜欢
  • 1970-01-01
  • 2021-04-08
  • 1970-01-01
  • 2018-10-09
  • 2021-06-26
  • 1970-01-01
  • 1970-01-01
  • 2012-01-04
  • 1970-01-01
相关资源
最近更新 更多