【问题标题】:Producer Consumer model using TPL, Tasks in .net 4.0使用 TPL 的生产者消费者模型,.net 4.0 中的任务
【发布时间】:2011-10-18 10:05:26
【问题描述】:

我有一个相当大的 XML 文件(大约 1-2GB)。

要求是将xml数据持久化到数据库中。 目前,这是通过 3 个步骤实现的。

  1. 尽可能读取内存占用少的大文件
  2. 从 xml 数据创建实体
  3. 使用 SqlBulkCopy 将创建的实体中的数据存储到数据库中。

为了获得更好的性能,我想创建一个生产者-消费者模型,其中生产者创建一组实体,例如一批 10K 并将其添加到队列中。消费者应该从队列中取出这批实体,并使用 sqlbulkcopy 持久化到数据库中。

谢谢, 悟空

void Main()
{
    int iCount = 0;
    string fileName = @"C:\Data\CatalogIndex.xml";

    DateTime startTime = DateTime.Now;
    Console.WriteLine("Start Time: {0}", startTime);
    FileInfo fi = new FileInfo(fileName);
    Console.WriteLine("File Size:{0} MB", fi.Length / 1048576.0);

/* I want to change this loop to create a producer consumer pattern here to process the data parallel-ly
*/
     foreach (var element in StreamElements(fileName,"title"))
            {
                iCount++;
            }

            Console.WriteLine("Count: {0}", iCount);
            Console.WriteLine("End Time: {0}, Time Taken:{1}", DateTime.Now, DateTime.Now - startTime);
        }

    private static IEnumerable<XElement> StreamElements(string fileName, string elementName)
    { 
        using (var rdr = XmlReader.Create(fileName))
        {
            rdr.MoveToContent();
            while (!rdr.EOF)
            {
                if ((rdr.NodeType == XmlNodeType.Element) && (rdr.Name == elementName))
                {
                    var e = XElement.ReadFrom(rdr) as XElement;
                    yield return e;
                }
                else
                {
                    rdr.Read();
                }
            }
            rdr.Close();
        }
    }

【问题讨论】:

  • 我尝试使用 Thread 本地存储,但到目前为止,我只使用单线程方法,即调用一个方法来创建实体,然后将它们同步持久化。

标签: c# multithreading .net-4.0 task-parallel-library


【解决方案1】:

这是你想要做的吗?

    void Main()
    {
        const int inputCollectionBufferSize = 1024;
        const int bulkInsertBufferCapacity = 100;
        const int bulkInsertConcurrency = 4;

        BlockingCollection<object> inputCollection = new BlockingCollection<object>(inputCollectionBufferSize);

        Task loadTask = Task.Factory.StartNew(() =>
        {
            foreach (object nextItem in ReadAllElements(...))
            {
                // this will potentially block if there are already enough items
                inputCollection.Add(nextItem);
            }

            // mark this collection as done
            inputCollection.CompleteAdding();
        });

        Action parseAction = () =>
        {
            List<object> bulkInsertBuffer = new List<object>(bulkInsertBufferCapacity);

            foreach (object nextItem in inputCollection.GetConsumingEnumerable())
            {
                if (bulkInsertBuffer.Length == bulkInsertBufferCapacity)
                {
                    CommitBuffer(bulkInsertBuffer);
                    bulkInsertBuffer.Clear();
                }

                bulkInsertBuffer.Add(nextItem);
            }
        };

        List<Task> parseTasks = new List<Task>(bulkInsertConcurrency);

        for (int i = 0; i < bulkInsertConcurrency; i++)
        {
            parseTasks.Add(Task.Factory.StartNew(parseAction));
        }

        // wait before exiting
        loadTask.Wait();
        Task.WaitAll(parseTasks.ToArray());
    }

【讨论】:

  • 感谢您提供伪代码。这是一个好的开始。我们如何使 CommitBuffer 方法在 X 个线程中并行执行。由于在耗时的操作中将数据持久化到数据库中,我想将其作为具有可配置线程数的多线程操作来执行。我将尝试实现此代码并在此处更新。
  • @user943141 - 这不会有太大的不同。如果你真的想要,你可以多次启动 parseTasks。但是,数据库操作是重 IO 可靠操作。您可能只会使用多个线程来减慢速度,因为 IO 是瓶颈并且同步很重
  • 请参考这篇文章sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/… Alberto 创建了一个 PC 模型来执行数据的批量插入。在这种情况下,消费者是多线程的并执行 sqlbulkcopy。
  • 因为我现在在中国没有我的VPN,所以我没有红帖子,但我已经编辑了我的代码以支持多个解析任务。我仍然认为它不会有任何好处,但我会把这个决定留给你
猜你喜欢
  • 2011-09-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-01-08
  • 2018-07-12
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多