【问题标题】:File Monitoring System Reactive Programming文件监控系统反应式编程
【发布时间】:2018-04-09 12:02:29
【问题描述】:

我正在使用 C#。我是响应式编程的新手。 使用反应式编程,我想创建一个文件夹监视系统,如果文件夹 A 包含任何文件,如果是,它将调用该文件,然后它将抓取该文件并处理它并将其移动到文件夹 B 中。 假设文件夹A首先是空的。用户将一些文件实时添加到文件夹A中。系统检测到新文件已添加,它将一个一个或同时处理。 我无法理解我应该使用 Create 或 Interval 什么,然后我的处理代码将在哪里编写 请帮帮我

【问题讨论】:

标签: c# system.reactive reactivex reactive


【解决方案1】:

这应该相当接近:

var query =
    Observable
        .Using(
            () =>
            {
                var fsw = new FileSystemWatcher(@"C:\A");
                fsw.EnableRaisingEvents = true;
                return fsw;
            },
            fsw => Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
                h => fsw.Created += h,
                h => fsw.Created -= h))
        .Delay(TimeSpan.FromSeconds(0.1));


query
    .Subscribe(x => File.Move(x.EventArgs.FullPath, Path.Combine(@"C:\B", x.EventArgs.Name)));

【讨论】:

  • 它已经接近但还没有,因为你永远不会启动FileSystemWatcher(带有EnableRaisingEvents 属性)。
  • @Evk - 这次我测试了它。
  • 为什么要延迟 100 毫秒?
  • @Evk - 在 Windows 中复制文件的行为使文件句柄保持打开一小段时间。需要延迟才能让操作系统在移动文件之前关闭文件。
  • 好的,但是如果我决定在那里复制 2GB 文件,这将失败。无论如何,正确处理FileSystemWatcher 事件可能超出了这个问题的范围。
【解决方案2】:

FileSystemWatcher 有一个相对较小的InternalBufferSize(默认为 8 KB,最大为 64 KB),如果在短时间内发生文件系统更改的突发,并且FileSystemWatcher 正在做任何耗时的事情。 documentation 给出了这样的建议:

让您的事件处理代码尽可能短。

超出缓冲区的后果很严重:所有缓冲的通知都会丢失。在大多数情况下,这应该是非常不可取的,如果不是完全不可接受的话。因此,需要避免在事件调用的同一线程上同步执行繁重的文件移动操作。实现理想异步的一种简单方法是在处理程序和订阅代码之间注入Delay。一种更复杂的方法是将传入的通知排队,并按顺序或以有限的并发处理每个文件。 Merge 运算符可用于排队和并发控制。这是一个例子¹:

IObservable<Unit> query = Observable
    .Using(() =>
        {
            var fsw = new FileSystemWatcher(@"C:\A");
            fsw.EnableRaisingEvents = true;
            return fsw;
        },
        fsw => Observable.FromEventPattern<FileSystemEventHandler,
            FileSystemEventArgs>(h => fsw.Created += h, h => fsw.Created -= h)
    )
    .Delay(TimeSpan.FromSeconds(0.1))
    .Select(x => Observable.Defer(() => Observable.Start(() =>
    {
        File.Move(x.EventArgs.FullPath, Path.Combine(@"C:\B", x.EventArgs.Name));
    })))
    .Merge(maxConcurrent: 2);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

Task<Unit> task = query.ToTask(cts.Token); // Start the file-watching

Observable.Defer+Observable.Start 组合用作异步 Observable.FromAsync 的同步等效项(因为 File.Move 方法是同步的)。

¹ 它是 Enigmativity 的 example 的修改版。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2010-12-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-09-14
    • 2023-01-13
    • 1970-01-01
    相关资源
    最近更新 更多