【问题标题】:Thread safe StreamWriter C# how to do it? 2线程安全的StreamWriter C#怎么做呢? 2
【发布时间】:2011-04-05 05:07:03
【问题描述】:

所以这是我上一个问题的延续 - 所以问题是 “就需要将双精度值写入文件而言,构建线程安全程序的最佳方法是什么。如果通过流写入器保存值的函数被多个线程调用?什么是最好的方法呢? ?”

我修改了一些在 MSDN 上找到的代码,下面的呢?这个正确地将所有内容写入文件。

namespace SafeThread
{
    class Program
    {
        static void Main()
        {
            Threading threader = new Threading();

            AutoResetEvent autoEvent = new AutoResetEvent(false);

            Thread regularThread =
                new Thread(new ThreadStart(threader.ThreadMethod));
            regularThread.Start();

            ThreadPool.QueueUserWorkItem(new WaitCallback(threader.WorkMethod),
                autoEvent);

            // Wait for foreground thread to end.
            regularThread.Join();

            // Wait for background thread to end.
            autoEvent.WaitOne();
        }
    }


    class Threading
    {
        List<double> Values = new List<double>();
        static readonly Object locker = new Object();
        StreamWriter writer = new StreamWriter("file");
        static int bulkCount = 0;
        static int bulkSize = 100000;

        public void ThreadMethod()
        {
            lock (locker)
            {
                while (bulkCount < bulkSize)
                    Values.Add(bulkCount++);
            }
            bulkCount = 0;
        }

        public void WorkMethod(object stateInfo)
        {
            lock (locker)
            {
                foreach (double V in Values)
                {
                    writer.WriteLine(V);
                    writer.Flush();
                }
            }
            // Signal that this thread is finished.
            ((AutoResetEvent)stateInfo).Set();
        }
    }
}

【问题讨论】:

    标签: c# thread-safety writer


    【解决方案1】:

    ThreadQueueUserWorkItem最低可用的线程API。除非我绝对,最后,别无选择,否则我不会使用它们。尝试使用Task 类以获得更高级别的抽象。详情请see my recent blog post on the subject

    您也可以将BlockingCollection&lt;double&gt; 用作适当的生产者/消费者队列,而不是尝试使用用于同步的最低可用API手动构建。

    正确地重新发明这些轮子非常困难。我强烈建议使用为此类需求设计的类(具体来说是TaskBlockingCollection)。它们内置于 .NET 4.0 框架和 are available as an add-on for .NET 3.5

    【讨论】:

      【解决方案2】:
      • 代码将编写器作为实例变量,但使用静态锁定器。如果您有多个实例写入不同的文件,那么它们没有理由需要共享同一个锁
      • 在相关说明中,由于您已经拥有 writer(作为私有实例 var),因此您可以将其用于锁定,而不是在这种情况下使用单独的 locker 对象 - 这会使事情变得更简单。

      “正确答案”实际上取决于您在锁定/阻塞行为方面要寻找的内容。例如,最简单的方法是跳过中间数据结构,只使用一个 WriteValues 方法,这样每个线程“报告”其结果就会继续并将它们写入文件。比如:

      StreamWriter writer = new StreamWriter("file");
      public void WriteValues(IEnumerable<double> values)
      {
          lock (writer)
          {
              foreach (var d in values)
              {
                  writer.WriteLine(d);
              }
              writer.Flush();
          }
      }
      

      当然,这意味着工作线程在其“报告结果”阶段进行序列化 - 根据性能特征,这可能还不错(例如,生成 5 分钟,写入 500 毫秒)。

      另一方面,您可以让工作线程写入数据结构。如果您使用的是 .NET 4,我建议您只使用 ConcurrentQueue 而不是自己锁定。

      此外,您可能希望以比工作线程报告的更大的批次执行文件 i/o,因此您可能会选择仅以某种频率在后台线程中写入。频谱的末端看起来如下所示(您将在实际代码中删除 Console.WriteLine 调用,这些调用就在那里,因此您可以看到它正在运行)

      public class ThreadSafeFileBuffer<T> : IDisposable
      {
          private readonly StreamWriter m_writer;
          private readonly ConcurrentQueue<T> m_buffer = new ConcurrentQueue<T>();
          private readonly Timer m_timer;
      
          public ThreadSafeFileBuffer(string filePath, int flushPeriodInSeconds = 5)
          {
              m_writer = new StreamWriter(filePath);
              var flushPeriod = TimeSpan.FromSeconds(flushPeriodInSeconds);
              m_timer = new Timer(FlushBuffer, null, flushPeriod, flushPeriod);
          }
      
          public void AddResult(T result)
          {
              m_buffer.Enqueue(result);
              Console.WriteLine("Buffer is up to {0} elements", m_buffer.Count);
          }
      
          public void Dispose()
          {
              Console.WriteLine("Turning off timer");
              m_timer.Dispose();
              Console.WriteLine("Flushing final buffer output");
              FlushBuffer(); // flush anything left over in the buffer
              Console.WriteLine("Closing file");
              m_writer.Dispose();
          }
      
          /// <summary>
          /// Since this is only done by one thread at a time (almost always the background flush thread, but one time via Dispose), no need to lock
          /// </summary>
          /// <param name="unused"></param>
          private void FlushBuffer(object unused = null)
          {
              T current;
              while (m_buffer.TryDequeue(out current))
              {
                  Console.WriteLine("Buffer is down to {0} elements", m_buffer.Count);
                  m_writer.WriteLine(current);
              }
              m_writer.Flush();
          }
      }
      
      class Program
      {
          static void Main(string[] args)
          {
              var tempFile = Path.GetTempFileName();
              using (var resultsBuffer = new ThreadSafeFileBuffer<double>(tempFile))
              {
                  Parallel.For(0, 100, i =>
                  {
                      // simulate some 'real work' by waiting for awhile
                      var sleepTime = new Random().Next(10000);
                      Console.WriteLine("Thread {0} doing work for {1} ms", Thread.CurrentThread.ManagedThreadId, sleepTime);
                      Thread.Sleep(sleepTime);
                      resultsBuffer.AddResult(Math.PI*i);
                  });
              }
              foreach (var resultLine in File.ReadAllLines(tempFile))
              {
                  Console.WriteLine("Line from result: {0}", resultLine);
              }
          }
      }
      

      【讨论】:

        【解决方案3】:

        所以你是说你想要一堆线程使用 StreamWriter 将数据写入单个文件?简单。只需锁定 StreamWriter 对象。

        这里的代码将创建 5 个线程。每个线程将执行 5 个“动作”,并在每个动作结束时将 5 行写入一个名为“file”的文件。

        using System;
        using System.Collections.Generic;
        using System.IO;
        using System.Threading;
        
        namespace ConsoleApplication1 {
            class Program {
                static void Main() {
                    StreamWriter Writer = new StreamWriter("file");
        
                    Action<int> ThreadProcedure = (i) => {
                        // A thread may perform many actions and write out the result after each action
                        // The outer loop here represents the multiple actions this thread will take
                        for (int x = 0; x < 5; x++) {
                            // Here is where the thread would generate the data for this action
                            // Well simulate work time using a call to Sleep
                            Thread.Sleep(1000);
                            // After generating the data the thread needs to lock the Writer before using it.
                            lock (Writer) {
                                // Here we'll write a few lines to the Writer
                                for (int y = 0; y < 5; y++) {
                                    Writer.WriteLine("Thread id = {0}; Action id = {1}; Line id = {2}", i, x, y);
                                }
                            }
                        }
                    };
        
                    //Now that we have a delegate for the thread code lets make a few instances
        
                    List<IAsyncResult> AsyncResultList = new List<IAsyncResult>();
                    for (int w = 0; w < 5; w++) {
                        AsyncResultList.Add(ThreadProcedure.BeginInvoke(w, null, null));
                    }
        
                    // Wait for all threads to complete
                    foreach (IAsyncResult r in AsyncResultList) {
                        r.AsyncWaitHandle.WaitOne();
                    }
        
                    // Flush/Close the writer so all data goes to disk
                    Writer.Flush();
                    Writer.Close();
                }
            }
        }
        

        结果应该是一个包含 125 行的文件“文件”,其中所有“动作”同时执行,每个动作的结果同步写入文件。

        【讨论】:

        • 你不应该锁定那些你不能像那样控制它们的实现的对象——如果它在内部将自己锁定在另一个线程中怎么办? - 你应该创建一个新的Object 用作锁。
        • 不要为锁使用单独的对象。直接锁定一个对象是确保所有可以看到一个对象的线程都能获得该对象的排他锁的唯一方法。
        • @LunaticExperimentalist:直接锁定对象也不能确保所有线程都获得排他锁。如果开发人员忘记将对象锁定在特定位置,则无论所有其他线程使用什么对象进行锁定,他都将拥有不受控制的访问权限。
        【解决方案4】:

        您那里的代码被巧妙地破坏了 - 特别是,如果排队的工作项首先运行,那么它将在终止之前立即刷新(空)值列表,之后您的工作人员会去填充列表(最终将被忽略)。自动重置事件也不做任何事情,因为从来没有人查询或等待它的状态。

        另外,由于每个线程使用一个不同的锁,锁没有任何意义!您需要确保在访问流写入器时持有单个共享锁。您不需要在刷新代码和生成代码之间加锁;您只需要确保在生成完成后刷新运行。

        不过,您可能走在正确的轨道上 - 尽管我会使用固定大小的数组而不是列表,并在数组满时刷新所有条目。这样可以避免线程长期存在时内存不足的可能性。

        【讨论】:

        • 如果列表为空怎么会刷新?
        • 我在回答中解释了这一点 - 排队的工作项 WorkMethod 可以在线程 ThreadMethod 之前运行之前。它也可以追赶。您无法预测哪个,因为您没有在此处设置任何类型的显式排序。
        猜你喜欢
        • 2011-04-05
        • 1970-01-01
        • 2016-01-31
        • 2012-05-26
        • 1970-01-01
        • 2014-06-16
        • 2016-06-17
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多