【问题标题】:Wrap a file watcher in reactive extensions在响应式扩展中包装文件观察器
【发布时间】:2012-12-19 00:03:35
【问题描述】:

我一直在考虑将文件观察程序包装在可观察对象中以帮助处理事件,但我在弄清楚如何从中获得我想要的行为时遇到了一些麻烦。文件监视器监视放置文件的目录。当第一次将文件放入该目录时,文件观察程序会触发 Created 事件。但是,如果文件很大或网络连接速度很慢,则在文件更新时会触发一系列 Changed 事件。我不想在文件写完之前处理文件,所以我真正需要的是这个时间线

|Created    |Changed   |Changed   |Changed                      
________________________________________________ 
^Write starts       ^Write finishes       ^Processing Starts    

我查看了许多在 Rx 中过滤事件的方法,但我无法得到我需要的“一旦文件文件在 X 秒内没有更改就触发函数”。油门不好,因为它会在中间丢失事件。缓冲区不好,因为事件可能发生在缓冲区边界上。

我曾考虑过使用超时,但我并没有因为它们引发异常而疯狂,我希望在写入文件时开始处理,而不是在没有更多事件时开始。

Reactive Extensions vs FileSystemWatcher 有一个类似的问题,但从未真正解决。

有没有一种方法可以让我轻松做到这一点?我确信这不是一个不常见的用例。

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    ObservableFileSystemWatcher - FileSystemWatcher type 周围的可观察包装器 - 完美运行。添加一个名为ReactiveFileSystemWatcher的NuGet包并创建一个控制台应用程序进行测试,如下所示

    class Program
    {
      static void Main(string[] args)
      {
        using (var watcher = new ObservableFileSystemWatcher(c => { c.Path = @"C:\FolderToWatch\"; c.IncludeSubdirectories = true; }))
        {
          watcher.Created.Select(x => $"{x.Name} was {x.ChangeType}").Subscribe(Console.WriteLine);
          watcher.Changed.Select(x => $"{x.Name} was {x.ChangeType}").Subscribe(Console.WriteLine);
          watcher.Renamed.Select(x => $"{x.OldName} was {x.ChangeType} to {x.Name}").Subscribe(Console.WriteLine);
          watcher.Deleted.Select(x => $"{x.Name} was {x.ChangeType}").Subscribe(Console.WriteLine);
          watcher.Errors.Subscribe(Console.WriteLine);
          watcher.Start();
          Console.ReadLine();
        }
      }
    }
    

    【讨论】:

      【解决方案2】:

      编辑:审查后,不认为你想要这个......

      也许我有点过于简单化了,但Throttle 在这里不是很理想吗?

      这绝不是“简单”,但我认为它比我之前的想法更接近你想要的:

      (奖励:有一个测试用例!;))

      void Main()
      {
          var pathToWatch = @"c:\temp\";
          var fsw = new FileSystemWatcher(pathToWatch);
      
          // set up observables for create and changed
          var changedObs = 
             Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
                dlgt => fsw.Changed += dlgt, 
                dlgt => fsw.Changed -= dlgt);
          var createdObs = 
             Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>( 
                dlgt => fsw.Created += dlgt, 
                dlgt => fsw.Created -= dlgt);
      
          // the longest we'll wait between last file write and calling it "changed"
          var maximumTimeBetweenWrites = TimeSpan.FromSeconds(1);
      
          // A "pulse" ticking off every 10ms (adjust this as desired)
          var timer = Observable
              .Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(10))
              .Select(i => DateTime.Now);
      
          var watcher = 
              from creation in createdObs
              from change in changedObs
                  // we only care about changes matching a create
                  .Where(changeEvt => changeEvt.EventArgs.Name == creation.EventArgs.Name)
                  // take latest of (pulse, changes) and select (event, time since last file write)
                  .CombineLatest(timer, (evt, now) => new {
                          Change = evt, 
                          DeltaFromLast = now.Subtract(new FileInfo(evt.EventArgs.FullPath).LastWriteTime)})
                  // skip all until we trigger than "time before considered changed" threshold
                  .SkipWhile(evt => evt.DeltaFromLast < maximumTimeBetweenWrites)
                  // Then lock on that until we change a diff file
                  .Distinct(evt => evt.Change.EventArgs.FullPath)
              select change.Change;
      
          var disp = new CompositeDisposable();
      
          // to show creates
          disp.Add(
              createdObs.Subscribe(
                 evt => Console.WriteLine("New file:{0}", 
                      evt.EventArgs.FullPath)));
      
          // to show "final changes"
          disp.Add(
              watcher.Subscribe(
                 evt => Console.WriteLine("{0}:{1}:{2}", 
                       evt.EventArgs.Name, 
                       evt.EventArgs.ChangeType, 
                       evt.EventArgs.FullPath)));
      
          fsw.EnableRaisingEvents = true;
      
          var rnd = new Random();
          Enumerable.Range(0,10)
              .AsParallel()
              .ForAll(i => 
                  {
                      var filename = Path.Combine(pathToWatch, "foo" + i + ".txt");
                      if(File.Exists(filename))
                          File.Delete(filename);
      
                      foreach(var j in Enumerable.Range(0, 20))
                      {
                          var writer = File.AppendText(filename);
                          writer.WriteLine(j);
                          writer.Close();
                          Thread.Sleep(rnd.Next(500));
                      }
                  });
      
          Console.WriteLine("Press enter to quit...");
          Console.ReadLine();
          disp.Dispose();        
      }
      

      【讨论】:

      • 我没有在我的单元测试中抛出这个,但我认为它不会起作用,因为更改的事件并不总是被触发。顺序是您有一个 Created 事件,后跟 0 个或多个 Changed 事件。我也不相信这解决了您可能有一个持续时间超过 5 秒的文件写入的可能性。
      • @stimms 实际上,完全忘记了我回答了这个问题……仔细看,我想这不是你想要的……让我更新一下……跨度>
      • @stimms 好的,对事物有了新的看法——“创建”应该让您在文件进入时开始处理文件,“最终更改”应该在文件“完成”时发出信号跨度>
      • 非常感谢您为此付出的努力。我想我终于得到了一些可以使用 BufferWithInactivity 的东西,但我仅仅阅读你的代码就学到了很多东西。
      • @stimms 不用担心,实际上写起来很有趣(好吧,也许不好玩……引人入胜)。 :)
      【解决方案3】:

      看看我在this answer中的BufferWithInactivity扩展方法。

      我认为您可以使用它来查找已更改事件中的不活动状态。

      【讨论】:

      • 我使用了不活动的缓冲区以及按功能分组来实现我想要的。
      【解决方案4】:

      查看 NuGet 包Reactive FileSystemWatcher

      源代码在GitHub page

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-07-31
        • 2018-09-20
        • 1970-01-01
        相关资源
        最近更新 更多