【问题标题】:Using thousands of Tasks with a timeout efficiently有效地使用数千个超时任务
【发布时间】:2015-08-27 13:53:09
【问题描述】:

我正在实现一个库 L,它通过套接字与另一个应用程序 A 进行通信。

基本工作流程如下:

  1. L 连接到 A。
  2. L 向 A 发送约 50.000 条信息 I,并且 为每个发出的 I 创建一个任务 T。
  3. L 监听来自 A 的传入结果,一旦有结果,就使用 TaskCompletionSource 设置Tasks的结果T
  4. L 创建一个设置超时的任务 T2 (Task.WhenAny(T,Task.Delay(xx))
  5. L 使用 Task.WhenAll(T2) 等待所有发送信息的超时或结果。

管理底层数据结构完全没有问题。主要问题是在我的计算机上组装“主要”Task.WhenAll(T2) 大约需要 5-6 秒。 50.000 个条目(创建 50.000*2+1 个任务)。

但是,我想不出一种更轻量级的方式来完成同样的任务。它应该使用所有可用的核心并且是非阻塞的,并且还支持超时。

有没有办法使用 Parallel- 或 ThreadPool 类来提高性能?

编辑: 显示基本设置的代码: https://dotnetfiddle.net/gIq2DP

【问题讨论】:

  • 那么做一个任务大概需要100微秒?你打算给每项任务做多少工作? 100uS 或设置开销将淹没您希望实现的任何并行性最好显着更大。出于好奇,当您制造 50,000*2 个任务时,您的 VM 映像会增长多少?如果您制造 50,000 个任务并让每个任务都运行一个空程序,运行需要多长时间?在 1 核系统上?在 4 或 8 核系统上?
  • 好的,贴一些代码。我曾经对任务创建吞吐量进行了基准测试。在我的机器上大约是每秒 10m。成本在您的代码中,而不是在 TPL 中。
  • 如果您发布 CPU 分析器结果的屏幕截图会更好。
  • 任务的主要“任务”是等待其他应用程序返回结果。我使用任务作为简化并行编程的一种方式。目前在 4 核 i7 4770k @ 4.3 GHz 上运行,使用 16 GB RAM。在使用 20.000 个任务进行测试时,总内存消耗达到了 50 MB 的峰值。编辑:我之前对代码进行了基准测试。创建任务本身占用了所有 CPU 周期的 14% 左右,我想减少这些。
  • 显然所有这些任务实际上不能同时运行,因为它们每个会消耗 1MB 的堆栈 - 即需要 20GB。

标签: c# sockets parallel-processing task


【解决方案1】:

总共启动 n LongRunningTasks,其中 n 是您机器上的内核数。每个任务都应该在一个核心上运行。为您要发送的每个 I 创建 50K 新任务将是一种浪费。而是将任务设计为接受I 和套接字信息——这些信息将被发送到哪里

创建一个BlockingCollection<Tuple<I, SocketInfo>>。启动一项任务来填充此阻塞集合。您之前创建的其他 n 个长时间运行的任务可以继续获取信息元组和发送信息的地址,然后在一个循环中为您执行作业,当阻塞收集完成时会中断。

可以在长时间运行的任务本身中设置超时。

整个设置将使您的 CPU 最大程度地忙于有用的工作,而不是让它不必要地忙于创建 50K 任务的“工作”。 p>

由于在主内存之外发生的操作(如网络操作)对于 CPU 来说是 very very slow,因此请随意设置 n 不仅等于您机器中的内核数,甚至三倍的价值。在我的代码演示中,我将其设置为仅等于内核数。

使用提供的链接中的代码,这是一种方法...

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq.Expressions;
using System.Net.NetworkInformation;
using System.Threading.Tasks;

namespace TestConsoleApplication
{
    public static class Test
    {
        public static void Main()
        {
            TaskRunningTest();
        }

        private static void TaskRunningTest()
        {
            var s = new Stopwatch();
            const int totalInformationChunks = 50000;
            var baseProcessorTaskArray = new Task[Environment.ProcessorCount];
            var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
            var tcs = new TaskCompletionSource<int>();

            var itemsToProcess = new BlockingCollection<Tuple<Information, Address>>(totalInformationChunks);

            s.Start();
            //Start a new task to populate the "itemsToProcess"
            taskFactory.StartNew(() =>
            {
                // Add Tuples of Information and Address to which this information is to be sent to.
                Console.WriteLine("Done intializing all the jobs...");
                // Finally signal that you are done by saying..
                itemsToProcess.CompleteAdding();
            });

            //Initializing the base tasks
            for (var index = 0; index < baseProcessorTaskArray.Length; index++)
            {
                var thisIndex = index;
                baseProcessorTaskArray[index] = taskFactory.StartNew(() =>
                {
                    while (!itemsToProcess.IsAddingCompleted && itemsToProcess.Count != 0)
                    {
                        Tuple<Information, Address> item;
                        itemsToProcess.TryTake(out item);
                        //Process the item
                        tcs.TrySetResult(thisIndex);
                    }
                });
            }

            // Need to provide new timeout logic now
            // Depending upon what you are trying to achieve with timeout, you can devise out the way

            // Wait for the base tasks to completely empty OR
            // timeout and then stop the stopwatch.
            Task.WaitAll(baseProcessorTaskArray); 
            s.Stop();
            Console.WriteLine(s.ElapsedMilliseconds);
        }

        private class Address
        {
            //This class should have the socket information
        }

        private class Information
        {
            //This class will have the Information to send
        }
    }
}

【讨论】:

    【解决方案2】:

    分析表明,大部分时间(90%?)都花在了计时器的设置、到期和处置上。这对我来说似乎是合理的。

    也许您可以创建自己的超级便宜的超时机制。将超时排入按到期时间排序的优先级队列中。然后,每 100 毫秒运行一个计时器,并使该计时器使优先级队列中到期的所有内容都过期。

    这样做的代价是每次超时一个TaskCompletionSource 和一些小的进一步处理。

    您甚至可以通过将超时从队列中删除并删除 TaskCompletionSource 来取消超时。

    【讨论】:

      猜你喜欢
      • 2020-06-19
      • 1970-01-01
      • 2022-01-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-28
      • 1970-01-01
      • 2021-10-17
      相关资源
      最近更新 更多