【问题标题】:Parallel.Foreach + yield return?Parallel.Foreach + 收益回报?
【发布时间】:2012-01-14 19:41:31
【问题描述】:

我想像这样使用并行循环来处理一些事情:

public void FillLogs(IEnumerable<IComputer> computers)
{
    Parallel.ForEach(computers, cpt=>
    {
        cpt.Logs = cpt.GetRawLogs().ToList();
    });

}

好的,它工作正常。但是如果我想让 FillLogs 方法返回一个 IEnumerable 怎么办?

public IEnumerable<IComputer> FillLogs(IEnumerable<IComputer> computers)
{
    Parallel.ForEach(computers, cpt=>
    {
        cpt.Logs = cpt.GetRawLogs().ToList();
        yield return cpt // KO, don't work
    });

}

编辑

这似乎不可能......但我使用这样的东西:

public IEnumerable<IComputer> FillLogs(IEnumerable<IComputer> computers)
{
    return computers.AsParallel().Select(cpt => cpt);
}

但是我把cpt.Logs = cpt.GetRawLogs().ToList(); 指令放在哪里

【问题讨论】:

  • 你的IEnumerable&lt;IComputer&gt; return-type 不会承担负载。

标签: c# yield-return parallel.foreach


【解决方案1】:

短版 - 不,这是不可能通过迭代器块实现的;较长的版本可能涉及调用者的迭代器线程(执行出队)和并行工作者(执行入队)之间的同步队列/出队;但作为旁注 - 日志通常受 IO 限制,并行处理受 IO 限制的事物通常效果不佳。

如果调用者要花一些时间消费每个,那么一次只处理一个日志的方法可能会有一些优点,但可以做到这一点while 调用者正在消费之前的日志;即它在yield 之前开始 Task 下一个项目,并在yield 之后等待完成...但这又是非常复杂的.举个简单的例子:

static void Main()
{
    foreach(string s in Get())
    {
        Console.WriteLine(s);
    }
}

static IEnumerable<string> Get() {
    var source = new[] {1, 2, 3, 4, 5};
    Task<string> outstandingItem = null;
    Func<object, string> transform = x => ProcessItem((int) x);
    foreach(var item in source)
    {
        var tmp = outstandingItem;

        // note: passed in as "state", not captured, so not a foreach/capture bug
        outstandingItem = new Task<string>(transform, item);
        outstandingItem.Start();

        if (tmp != null) yield return tmp.Result;
    }
    if (outstandingItem != null) yield return outstandingItem.Result;
}
static string ProcessItem(int i)
{
    return i.ToString();
}

【讨论】:

  • 不完全是,但我有一个类似的问题(被忽视:))将产量结果返回到 parallel.foreach。我想到的不同背景可能会对某人有所帮助。 stackoverflow.com/questions/32183463/…
【解决方案2】:

我不想冒犯,但可能缺乏理解。 Parallel.ForEach 表示 TPL 将根据可用硬件在多个线程中运行 foreach。但这意味着,ii 可以并行完成这项工作! yield return 让您有机会从列表中获取一些值(或其他任何值),并在需要时将它们一一归还。它避免了首先找到所有匹配条件的项目然后迭代它们的需要。这确实是性能优势,但不能并行完成。

【讨论】:

  • 但是,如果每一代收益返回值都需要一些时间,那么您是否希望并行处理下一个收益返回值以便您可以更快地获得它。我想像有一个缓冲区或什么的?我不知道是否缺乏理解,但我可以想象(即使理解)人们想要更快屈服的情况。我猜 yield 的意图是在必要时进行处理,所以 yield return 可能并不严格符合。但我当然可以想象需要什么......
【解决方案3】:

怎么样

            Queue<string> qu = new Queue<string>();
            bool finished = false;
            Task.Factory.StartNew(() =>
            {
                Parallel.ForEach(get_list(), (item) =>
                {
                    string itemToReturn = heavyWorkOnItem(item);         
                    lock (qu)
                       qu.Enqueue(itemToReturn );                        
                });
                finished = true;
            });

            while (!finished)
            {
                lock (qu)
                    while (qu.Count > 0)
                        yield return qu.Dequeue();
                //maybe a thread sleep here?
            }

编辑: 我认为这样更好:

        public static IEnumerable<TOutput> ParallelYieldReturn<TSource, TOutput>(this IEnumerable<TSource> source, Func<TSource, TOutput> func)
        {
            ConcurrentQueue<TOutput> qu = new ConcurrentQueue<TOutput>();
            bool finished = false;
            AutoResetEvent re = new AutoResetEvent(false);
            Task.Factory.StartNew(() =>
            {
                Parallel.ForEach(source, (item) =>
                {
                    qu.Enqueue(func(item));
                    re.Set();
                });
                finished = true;
                re.Set();
            });

            while (!finished)
            {
                re.WaitOne();
                while (qu.Count > 0)
                {
                    TOutput res;
                    if (qu.TryDequeue(out res))
                        yield return res;
                }
            }
        }   

Edit2:我同意简短的回答。这段代码没用;你不能打破 yield 循环。

【讨论】:

    【解决方案4】:

    虽然这个问题很老,但我还是设法做一些有趣的事情。

    class Program
    {
        static void Main(string[] args)
        {
            foreach (var message in GetMessages())
            {
                Console.WriteLine(message);
            }
        }
    
    
        // Parallel yield
        private static IEnumerable<string> GetMessages()
        {
            int total = 0;
            bool completed = false;
            var batches = Enumerable.Range(1, 100).Select(i => new Computer() { Id = i });
            var qu = new ConcurrentQueue<Computer>();
            Task.Run(() =>
            {
                try
                {
                    Parallel.ForEach(batches,
                        () => 0,
                        (item, loop, subtotal) =>
                        {
                            Thread.Sleep(1000);
                            qu.Enqueue(item);
                            return subtotal + 1;
                        },
                        result => Interlocked.Add(ref total, result));
                }
                finally
                {
                    completed = true;
                }
            });
    
            int current = 0;
            while (current < total || !completed)
            {
                SpinWait.SpinUntil(() => current < total || completed);
                if (current == total) yield break;
                current++;
                qu.TryDequeue(out Computer computer);
                yield return $"Completed {computer.Id}";
            }
        }
    }
    
    public class Computer
    {
        public int Id { get; set; }
    }
    

    与 Koray 的回答相比,这确实使用了所有 CPU 内核。

    【讨论】:

      【解决方案5】:

      您可以使用以下扩展方法

      public static class ParallelExtensions
      {
          public static IEnumerable<T1> OrderedParallel<T, T1>(this IEnumerable<T> list, Func<T, T1> action)
          {
              var unorderedResult = new ConcurrentBag<(long, T1)>();
              Parallel.ForEach(list, (o, state, i) =>
              {
                  unorderedResult.Add((i, action.Invoke(o)));
              });
              var ordered = unorderedResult.OrderBy(o => o.Item1);
              return ordered.Select(o => o.Item2);
          }
      }
      

      像这样使用:

      public void FillLogs(IEnumerable<IComputer> computers)
      {
          cpt.Logs = computers.OrderedParallel(o => o.GetRawLogs()).ToList();
      }
      

      希望这会为您节省一些时间。

      【讨论】:

        猜你喜欢
        • 2011-05-27
        • 1970-01-01
        • 2018-01-23
        • 2010-12-01
        • 1970-01-01
        • 1970-01-01
        • 2011-01-13
        • 2010-09-22
        相关资源
        最近更新 更多