【问题标题】:Why simple multi task doesn't work when multi thread does?为什么多线程时简单的多任务不起作用?
【发布时间】:2016-12-13 11:22:29
【问题描述】:
var finalList = new List<string>();
var list = new List<int> {1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ................. 999999};

var init = 0;
var limitPerThread = 5;

var countDownEvent = new CountdownEvent(list.Count);

for (var i = 0; i < list.Count; i++)
{
    var listToFilter = list.Skip(init).Take(limitPerThread).ToList();
    new Thread(delegate()
                   {
                       Foo(listToFilter);
                       countDownEvent.Signal();
                   }).Start();    
    init += limitPerThread;
}

//wait all to finish
countDownEvent.Wait();


private static void Foo(List<int> listToFilter)
{
    var listDone = Boo(listToFilter);
    lock (Object)
    {
        finalList.AddRange(listDone);
    }
}

这不是:

var taskList = new List<Task>();

for (var i = 0; i < list.Count; i++)
{
    var listToFilter = list.Skip(init).Take(limitPerThread).ToList();
    var task = Task.Factory.StartNew(() => Foo(listToFilter)); 
    taskList.add(task);   
    init += limitPerThread;
}

//wait all to finish
Task.WaitAll(taskList.ToArray());

这个进程最终必须创建至少 700 个线程。当我使用 Thread 运行时,它可以工作并创建所有这些。但是对于Task,它不会.. 似乎它没有开始多个Tasks async。

我真的很想知道为什么....有什么想法吗?

编辑

另一个带有 PLINQ 的版本(按照建议)。

var taskList = new List<Task>(list.Count);
Parallel.ForEach(taskList, t =>
                   {
                       var listToFilter = list.Skip(init).Take(limitPerThread).ToList();
                       Foo(listToFilter);
                       init += limitPerThread;
                       t.Start();
                   });
Task.WaitAll(taskList.ToArray());

EDIT2:

public static List<Communication> Foo(List<Dispositive> listToPing)
{
    var listResult = new List<Communication>();
    foreach (var item in listToPing)
    {
        var listIps = item.listIps;
        var communication = new Communication
        {
            IdDispositive = item.Id
        };

        try
        {
            for (var i = 0; i < listIps.Count(); i++)
            {
                var oPing = new Ping().Send(listIps.ElementAt(i).IpAddress, 10000);
                if (oPing != null)
                {
                    if (oPing.Status.Equals(IPStatus.TimedOut) && listIps.Count() > i+1)
                        continue;
                    if (oPing.Status.Equals(IPStatus.TimedOut))
                    {
                        communication.Result = "NOK"; 
                        break;
                    }
                    communication.Result = oPing.Status.Equals(IPStatus.Success) ? "OK" : "NOK";
                    break;
                }
                if (listIps.Count() > i+1)
                    continue;
                communication.Result = "NOK";
                break;
            }
        }
        catch
        {
            communication.Result = "NOK";
        }
        finally
        {
            listResult.Add(communication);
        }
    }

    return listResult;
}

【问题讨论】:

  • 你能一个一个地处理元素吗?也就是说,您可以在单个元素而不是列表上调用Boo() 吗?
  • 我无法一个接一个地处理它们。这是一个 ping 多个 ips 地址的过程...我需要在不到 2 分钟的时间内 ping 所有这些 (+- 3000)。我需要存储结果。
  • 等等... 什么 Boo() 做了什么,实际上?它不会简单地产生一个ping 进程,是吗?
  • 您应该使用Ping class。这甚至有一个 SendAsync() 方法,这是你想要的。
  • 一个典型的 XY 问题。您询问有关创建 700 个线程或 3000 个进程的问题,但您的问题是“如何进行大规模 Ping”。回答 Y 不会涉及 X。

标签: c# multithreading task-parallel-library task multitasking


【解决方案1】:

Tasks 不是多线程。它们可以用于此目的,但大多数情况下它们实际上用于相反的用途 - 在单个线程上进行多路复用。

要将任务用于多线程,我建议使用Parallel LINQ。它已经进行了许多优化,例如列表的智能分区以及仅生成与 CPU 内核一样多的线程等。


要理解Taskasync,可以这样想——典型的工作负载通常包括需要等待的IO。也许你读了一个文件,或者查询一个网络服务,或者访问一个数据库,或者其他什么。关键是 - 您的线程要等待很长时间(至少在 CPU 周期中),直到您从某个遥远的目的地获得响应。

在 Olden Days™ 中,这意味着您的帖子被锁定(暂停),直到收到回复。如果您想在此期间做其他事情,则需要生成一个新线程。这是可行的,但效率不是很高。每个操作系统线程都会带来很大的开销(内存、内核资源)。最后可能会有多个线程主动消耗 CPU,这意味着操作系统需要在它们之间切换,以便每个线程都获得一点 CPU 时间,而这些“上下文切换”非常昂贵。

async 更改了该工作流程。现在您可以在同一个线程上执行多个工作负载。虽然其中一项工作是await来自遥远来源的结果,但另一项工作可以介入并使用该线程做其他有用的事情。当第二个工作负载达到自己的await 时,第一个工作负载可以唤醒并继续。

毕竟,生成的线程数超过 CPU 内核数是没有意义的。你不会以这种方式完成更多的工作。恰恰相反——更多的时间将花在切换线程上,而更少的时间将用于有用的工作。

这就是 Task/async/await 最初的设计目的。然而,并行 LINQ 也利用了它并将其重用于多线程。在这种情况下,您可以这样看待它 - 其他线程就是您的主线程是您的主线程正在等待的“遥远目的地”。

【讨论】:

  • 好文字,兄弟...我也尝试了 PLINQ,但没有成功..我将在实现中编辑问题。如果您能看一下,我将不胜感激。
  • “任务不是多线程的” - 不,并非总是如此,但 StartNew() 确实意味着至少将它们推送到 ThreadPool。
  • 没错,但普遍的误解是异步 == 多线程。看来OP也是这么想的。
  • @HenkHolterman StartNew 使用 Current 同步上下文,它可以是 UI 上下文,因此不会将任务发送到线程池
【解决方案2】:

任务在线程池上执行。这意味着少数线程将服务于大量任务。您有多线程,但不是每个生成的任务都有一个线程。

你应该使用任务。您的目标应该是使用与 CPU 一样多的线程。通常,线程池会为您执行此操作。

【讨论】:

    【解决方案3】:

    您是如何衡量绩效的?您认为700 线程会比4 线程执行的700 任务运行得更快吗?不,他们不会。

    似乎它没有启动多个异步任务

    你是怎么想到这个的?正如 cmets 和其他答案中所建议的那样,您可能需要删除线程创建,因为在创建 700 线程后,您会降低系统性能,因为您的线程会在没有任何工作的情况下相互争夺处理器时间完成得更快。

    因此,您需要将 IO 操作的 async/await 添加到 Foo 方法中,使用 SendPingAsync 版本。此外,您的方法可以简化,因为对 listIps.Count() &gt; i + 1 条件的许多检查都是无用的 - 您可以在 for 条件块中执行此操作:

    public static async Task<List<Communication>> Foo(List<Dispositive> listToPing)
    {
        var listResult = new List<Communication>();
        foreach (var item in listToPing)
        {
            var listIps = item.listIps;
            var communication = new Communication
            {
                IdDispositive = item.Id
            };
    
            try
            {
                var ping = new Ping();
                communication.Result = "NOK";
    
                for (var i = 0; i < listIps.Count(); i++)
                {
                    var oPing = await ping.SendPingAsync(listIps.ElementAt(i).IpAddress, 10000);
                    if (oPing != null)
                    {
                        if (oPing.Status.Equals(IPStatus.Success)
                        {
                            communication.Result = "OK";
                            break;
                        }
                    }
                }
            }
            catch
            {
                communication.Result = "NOK";
            }
            finally
            {
                listResult.Add(communication);
            }
        }
    
        return listResult;
    }
    

    您的代码的其他问题是 PLINQ 版本不是线程安全的:

    init += limitPerThread;
    

    并行执行时可能会失败。你可以引入一些辅助方法,比如this answer:

    private async Task<List<PingReply>> PingAsync(List<Communication> theListOfIPs)
    {
        Ping pingSender = new Ping();
        var tasks = theListOfIPs.Select(ip => pingSender.SendPingAsync(ip, 10000));
        var results = await Task.WhenAll(tasks);
    
        return results.ToList();
    }
    

    并进行这种检查(为简单起见,删除了try/catch 逻辑):

    public static async Task<List<Communication>> Foo(List<Dispositive> listToPing)
    {
        var listResult = new List<Communication>();
        foreach (var item in listToPing)
        {
            var listIps = item.listIps;
            var communication = new Communication
            {
                IdDispositive = item.Id
            };
            var check = await PingAsync(listIps);
            communication.Result = check.Any(p => p.Status.Equals(IPStatus.Success)) ? "OK" : "NOK";
         }
     }
    

    您可能应该使用Task.Run 而不是Task.StartNew,以确保您没有阻塞 UI 线程。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-06-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-05-02
      • 1970-01-01
      • 2021-08-07
      • 1970-01-01
      相关资源
      最近更新 更多