【问题标题】:MaxDegreeOfParallelism with Task.Factory.StartNew()MaxDegreeOfParallelism 与 Task.Factory.StartNew()
【发布时间】:2016-04-22 16:31:41
【问题描述】:

我有一个程序,我可以在其中从 Internet 下载文件并进行处理。以下是我编写的使用线程下载文件的函数。

Task<File> re = Task.Factory.StartNew(() => { /*Download the File*/ });
re.ContinueWith((x) => { /*Do another function*/ });

我现在希望它只使用 10 个线程进行下载。我查看了 ParallelOptions.MaxDegreeOfParallelism 属性,但我不明白当任务返回结果时如何使用它。

【问题讨论】:

  • 你要并行下载10个文件吗?文件下载完成后,您继续使用 CPU 密集型的其他功能吗?
  • 我只希望在给定时间内并行下载 10 个文件。大约有 100 个文件,另一个功能是 CPU 密集型的。

标签: c# multithreading parallel-processing task-parallel-library task


【解决方案1】:

一个好方法是使用DataFlow API。要使用它,您必须安装Microsoft.Tpl.Dataflow Nuget package

假设您有以下几种数据下载和处理方法:

public async Task<DownloadResult> DownloadFile(string url)
{
    //Asynchronously download the file and return the result of the download.
    //You don't need a thread to download the file if you use asynchronous API.
}

public ProcessingResult ProcessDownloadResult(DownloadResult download_result)
{
    //Synchronously process the download result and produce a ProcessingResult.    
}

假设您有一个要下载的 URL 列表:

List<string> urls = new List<string>();

然后您可以使用 DataFlow API 执行以下操作:

TransformBlock<string,DownloadResult> download_block =
    new TransformBlock<string, DownloadResult>(
        url => DownloadFile(url),
        new ExecutionDataflowBlockOptions
        {
            //Only 10 asynchronous download operations
            //can happen at any point in time.
            MaxDegreeOfParallelism = 10
        });

TransformBlock<DownloadResult, ProcessingResult> process_block =
    new TransformBlock<DownloadResult, ProcessingResult>(
        dr => ProcessDownloadResult(dr),
        new ExecutionDataflowBlockOptions
        {
            //We limit the number of CPU intensive operation
            //to the number of processors in the system.
            MaxDegreeOfParallelism = Environment.ProcessorCount
        });

download_block.LinkTo(process_block);

foreach(var url in urls)
{
    download_block.Post(url);
}

【讨论】:

  • 对否决票有何评论?我真的很想知道这段代码是否有问题。
【解决方案2】:

你可以使用类似的东西:

Func<File> work = () => {
    // Do something
    File file = ...
    return file
};

var maxNoOfWorkers = 10;    
IEnumerable<Task> tasks = Enumerable.Range(0, maxNoOfWorkers)
    .Select(s =>
    {
        var task = Task.Factory.StartNew<File>(work);
        return task.ContinueWith(ant => { /* do soemthing else */ });        
    });

这种方式TPL 决定从threadpool 获取多少线程,但如果您真的想创建一个专用 (non-threadpool) 线程,您可以使用以下方法:

IEnumerable<Task> tasks = Enumerable.Range(0, maxNoOfWorkers)
    .Select(s =>
    {
        var task = Task.Factory.StartNew<File>(
            work, 
            CancellationToken.None, 
            TaskCreationOptions.LongRunning, 
            TaskScheduler.Default);
        return task.ContinueWith(ant => { /* do soemthing else */ });
    });

您的其他选择是使用PLINQParaller.For/ForEach,您可以使用MaxDegreeOfParallelism

PLINQ 示例可以是:

Func<File> work = () => {
    // Do something
    File file = ...
    return file
};

var maxNoOfWorkers = 10;
ParallelEnumerable.Range(0, maxNoOfWorkers)
    .WithDegreeOfParallelism(maxNoOfWorkers)
    .ForAll(x => { 
        var file = work();
        // Do something with file
    });

当然,我不知道您示例的上下文,因此您可能需要根据您的要求对其进行调整。

【讨论】:

    猜你喜欢
    • 2011-06-27
    • 2012-03-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-03-31
    • 2013-06-04
    • 1970-01-01
    相关资源
    最近更新 更多