【问题标题】:Parallel.Invoke - Dynamically creating more 'threads'Parallel.Invoke - 动态创建更多“线程”
【发布时间】:2012-05-31 17:33:25
【问题描述】:

我正在自学 Parallel.Invoke 和一般的并行处理,以便在当前项目中使用。我需要朝着正确的方向努力,以了解如何根据需要动态\智能地分配更多并行“线程”。

举个例子。假设您正在解析大型日志文件。这涉及从文件中读取,对返回的行进行某种解析,最后写入数据库。

所以对我来说,这是一个可以从并行处理中受益的典型问题。

作为简单的第一遍,以下代码实现了这一点。

Parallel.Invoke(
  ()=> readFileLinesToBuffer(),
  ()=> parseFileLinesFromBuffer(),
  ()=> updateResultsToDatabase()    
);

幕后

  1. readFileLinesToBuffer() 读取每一行并存储到缓冲区。
  2. parseFileLinesFromBuffer 出现并使用缓冲区中的行,然后假设它将它们放在另一个缓冲区中,以便 updateResultsToDatabase() 可以出现并使用此缓冲区。

所以显示的代码假定三个步骤中的每一个都使用相同的时间\资源,但可以说 parseFileLinesFromBuffer() 是一个长时间运行的过程,因此您希望并行运行两个方法,而不是只运行其中一种方法.

如何让代码根据它可能感知到的任何瓶颈智能地决定执行此操作?

从概念上讲,我可以看到一些监视缓冲区大小的方法可能如何工作,例如产生一个新的“线程”以提高速率消耗缓冲区...但我认为此类问题已被考虑在放在一起TPL 库。

一些示例代码会很棒,但我真的只需要一个线索来了解我接下来应该研究哪些概念。看起来可能 System.Threading.Tasks.TaskScheduler 掌握着关键?

【问题讨论】:

  • 使用 async/await,我想说只写“正常”的顺序/命令式代码,并确保您正在执行异步调用而不是任何阻塞调用。另一种方法(尽管可能为此过度使用)是使用 TPL DataFlow,您可以在其中设置处理“块”,然后将它们连接起来。 msdn.microsoft.com/en-us/devlabs/gg585582.aspx
  • 元素的顺序对你来说重要吗?
  • 不,顺序不重要。我认为只要我将文件中的行放入数据库中,我就可以在通过其他“应用程序”(例如报告或 Web 应用程序)完成的查询中进行任何排序。 我正在根据以下建议忙着研究 Rx(但到目前为止,所有建议都对我的学习很有用...ca-chink...ca-chink..随着各个部分的到位)

标签: c# task-parallel-library


【解决方案1】:

您是否尝试过响应式扩展?

http://msdn.microsoft.com/en-us/data/gg577609.aspx

Rx 是微软的一项新技术,重点如官网所述:

反应式扩展 (Rx)... ...是一个可以编写的库 使用可观察集合的异步和基于事件的程序 LINQ 样式的查询运算符。

您可以将其作为 Nuget 包下载

https://nuget.org/packages/Rx-Main/1.0.11226

由于我现在正在学习Rx,所以我想拿这个例子并为其编写代码,我最终得到的代码实际上并不是并行执行的,而是完全异步的,并保证源代码行按顺序执行.

也许这不是最好的实现,但就像我说的我正在学习 Rx,(线程安全应该是一个很好的改进)

这是我用来从后台线程返回数据的 DTO

class MyItem
{
    public string Line { get; set; }
    public int CurrentThread { get; set; }
}

这些是做实际工作的基本方法,我用一个简单的Thread.Sleep 模拟时间,并返回用于执行每个方法Thread.CurrentThread.ManagedThreadId 的线程。注意ProcessLine的定时器是4秒,这是最耗时的操作

private IEnumerable<MyItem> ReadLinesFromFile(string fileName)
{
    var source = from e in Enumerable.Range(1, 10)
                 let v = e.ToString()
                 select v;

    foreach (var item in source)
    {
        Thread.Sleep(1000);
        yield return new MyItem { CurrentThread = Thread.CurrentThread.ManagedThreadId, Line = item };
    }
}

private MyItem UpdateResultToDatabase(string processedLine)
{
    Thread.Sleep(700);
    return new MyItem { Line = "s" + processedLine, CurrentThread = Thread.CurrentThread.ManagedThreadId };
}

private MyItem ProcessLine(string line)
{
    Thread.Sleep(4000);
    return new MyItem { Line = "p" + line, CurrentThread = Thread.CurrentThread.ManagedThreadId };
}

下面的方法我只是用来更新UI的

private void DisplayResults(MyItem myItem, Color color, string message)
{
    this.listView1.Items.Add(
        new ListViewItem(
            new[]
            {
                message, 
                myItem.Line ,
                myItem.CurrentThread.ToString(), 
                Thread.CurrentThread.ManagedThreadId.ToString()
            }
        )
        {
            ForeColor = color
        }
    );
}

最后这是调用 Rx API 的方法

private void PlayWithRx()
{
    // we init the observavble with the lines read from the file
    var source = this.ReadLinesFromFile("some file").ToObservable(Scheduler.TaskPool);

    source.ObserveOn(this).Subscribe(x =>
    {
        // for each line read, we update the UI
        this.DisplayResults(x, Color.Red, "Read");

        // for each line read, we subscribe the line to the ProcessLine method
        var process = Observable.Start(() => this.ProcessLine(x.Line), Scheduler.TaskPool)
            .ObserveOn(this).Subscribe(c =>
            {
                // for each line processed, we update the UI
                this.DisplayResults(c, Color.Blue, "Processed");

                // for each line processed we subscribe to the final process the UpdateResultToDatabase method
                // finally, we update the UI when the line processed has been saved to the database
                var persist = Observable.Start(() => this.UpdateResultToDatabase(c.Line), Scheduler.TaskPool)
                    .ObserveOn(this).Subscribe(z => this.DisplayResults(z, Color.Black, "Saved"));
            });
    });
}

这个过程完全在后台运行,这是生成的输出:

【讨论】:

【解决方案2】:

在 async/await 世​​界中,你会得到类似的东西:

public async Task ProcessFileAsync(string filename)
{
    var lines = await ReadLinesFromFileAsync(filename);
    var parsed = await ParseLinesAsync(lines);
    await UpdateDatabaseAsync(parsed);
}

然后调用者可以执行 var tasks = filenames.Select(ProcessFileAsync).ToArray();和任何东西(WaitAll、WhenAll 等,取决于上下文)

【讨论】:

  • 但是如何同时运行两个ParseLinesAsync()
【解决方案3】:

使用几个BlockingCollectionHere is an example

这个想法是您创建一个 producer 将数据放入集合中

while (true) {
    var data = ReadData();
    blockingCollection1.Add(data);
}

然后你创建任意数量的从集合中读取的消费者

while (true) {
    var data = blockingCollection1.Take();
    var processedData = ProcessData(data);
    blockingCollection2.Add(processedData);
}

等等

你也可以让 TPL 使用 Parallel.Foreach 来处理消费者的数量

Parallel.ForEach(blockingCollection1.GetConsumingPartitioner(),
                 data => {
                          var processedData = ProcessData(data);
                          blockingCollection2.Add(processedData);
                 });

(注意你需要使用GetConsumingPartitioner 而不是GetConsumingEnumerable (see here)

【讨论】:

    猜你喜欢
    • 2012-06-25
    • 1970-01-01
    • 2012-04-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-12-20
    相关资源
    最近更新 更多