【问题标题】:Parallel Producer / Consumer with fault tolerance?具有容错能力的并行生产者/消费者?
【发布时间】:2011-04-14 12:06:30
【问题描述】:

我需要使用 SqlBulkCopy 将大型 csv 文件分块到几个不同的数据库插入中。我打算通过 2 个单独的任务来执行此操作,1 个用于批处理 CSV 文件,另一个用于插入数据库。作为一个例子,我就是这样的:

public class UberTask
{
    private readonly BlockingCollection<Tuple<string,int>> _store = new BlockingCollection<Tuple<string, int>>();

    public void PerformTask()
    {
        var notifier = new UINotifier();
        Task.Factory.StartNew(() =>
                                  {
                                      for (int i =0; i < 10; i++)
                                      {
                                          string description = string.Format("Scenario {0}", i);

                                          notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Reading '{0}' from file", description)));

                                          // represents reading the CSV file.
                                          Thread.Sleep(500);
                                          notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Enqueuing '{0}'", description)));
                                          _store.Add(new Tuple<string, int>(description, i));
                                      }
                                      _store.CompleteAdding();
                                  });

        var consumer = Task.Factory.StartNew(() =>
                                                 {
                                                     foreach (var item in _store.GetConsumingEnumerable())
                                                     {
                                                         var poppedItem = item;
                                                         notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Sending '{0}' to the database", poppedItem.Item1)));
                                                         // represents sending stuff to the database.
                                                         Thread.Sleep(1000);
                                                     }
                                                 });
        consumer.Wait();
        Console.WriteLine("complete");
    }
}

这是配对 2 组相关任务的好方法吗?上面的代码没有处理什么(它需要):

  • 如果代表 CSV 读取的任务出现故障,则需要停止其他任务(即使 _store 中仍有项目。)
  • 如果代表数据库的Task插入故障,其他进程可以停止处理。
  • 如果配对任务中的任何一个出现故障,我将需要执行一些操作来回滚数据库更新(我不担心我将如何回滚),更多的问题是我如何编写“故障发生在其中一项配对任务,所以我需要整理一下”。

任何关于上述内容的帮助将不胜感激!

【问题讨论】:

    标签: c# multithreading task task-parallel-library


    【解决方案1】:

    您可以使用异常处理和取消标记来执行此操作。当管道阶段检测到错误时,它会捕获它并设置令牌。这将取消其他阶段。 finally 块确保调用 CompleteAdding()。这很重要,因为接收管道阶段可能会在等待集合时被阻塞,并且在解除阻塞之前不会处理取消。

    您还希望在您的集合中释放任何未处理的对象,或者在您的情况下,在管道阶段完成(最终)和/或整个管道关闭时清理您的数据库连接。

    这是执行此操作的管道阶段的示例:

        static void LoadPipelinedImages(IEnumerable<string> fileNames, 
                                        string sourceDir, 
                                        BlockingCollection<ImageInfo> original,
                                        CancellationTokenSource cts)
        {
            // ...
            var token = cts.Token;
            ImageInfo info = null;
            try
            {
                foreach (var fileName in fileNames)
                {
                    if (token.IsCancellationRequested)
                        break;
                    info = LoadImage(fileName, ...);
                    original.Add(info, token);
                    info = null;
                }                
            }
            catch (Exception e)
            {
                // in case of exception, signal shutdown to other pipeline tasks
                cts.Cancel();
                if (!(e is OperationCanceledException))
                    throw;
            }
            finally
            {
                original.CompleteAdding();
                if (info != null) info.Dispose();
            }
        }
    

    整个管道代码如下所示。它还支持通过设置取消令牌从外部(从 UI)取消管道。

        static void RunPipelined(IEnumerable<string> fileNames, 
                                 string sourceDir, 
                                 int queueLength, 
                                 Action<ImageInfo> displayFn,
                                 CancellationTokenSource cts)
        {
            // Data pipes 
            var originalImages = new BlockingCollection<ImageInfo>(queueLength);
            var thumbnailImages = new BlockingCollection<ImageInfo>(queueLength);
            var filteredImages = new BlockingCollection<ImageInfo>(queueLength);
            try
            {
                var f = new TaskFactory(TaskCreationOptions.LongRunning,
                                        TaskContinuationOptions.None);
                // ...
    
                // Start pipelined tasks
                var loadTask = f.StartNew(() =>
                      LoadPipelinedImages(fileNames, sourceDir, 
                                          originalImages, cts));
    
                var scaleTask = f.StartNew(() =>
                      ScalePipelinedImages(originalImages, 
                                           thumbnailImages, cts));
    
                var filterTask = f.StartNew(() =>
                      FilterPipelinedImages(thumbnailImages, 
                                            filteredImages, cts));
    
                var displayTask = f.StartNew(() =>
                      DisplayPipelinedImages(filteredImages.GetConsumingEnumerable(), 
                           ... cts));
    
                Task.WaitAll(loadTask, scaleTask, filterTask, displayTask);
            }
            finally
            {
                // in case of exception or cancellation, there might be bitmaps
                // that need to be disposed.
                DisposeImagesInQueue(originalImages);
                DisposeImagesInQueue(thumbnailImages);
                DisposeImagesInQueue(filteredImages);                
            }
        }
    

    有关完整示例,请参阅此处下载的管道示例:

    http://parallelpatterns.codeplex.com/releases/view/50473

    在这里讨论:

    http://msdn.microsoft.com/en-us/library/ff963548.aspx

    【讨论】:

    • 感谢阿德的详细解答!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-06-19
    相关资源
    最近更新 更多