【问题标题】:Merging Observables in Reactive Extensions?在响应式扩展中合并 Observables?
【发布时间】:2015-03-28 12:39:09
【问题描述】:

刚学RX,想做一个迭代文件系统的程序。这是我想出的有效方法:

using System;
using System.IO;
using System.Reactive.Disposables;
using System.Reactive.Linq;

namespace ConsoleApplication8
{
    internal class Program
    {
        private static IObservable<string> GetFiles(string folder, string filePattern)
        {
            return Observable.Create<string>(
                o =>
                {
                    var files = Directory.GetFiles(folder, filePattern);

                    foreach (var file in files)
                    {
                        o.OnNext(file);
                    }

                    var folders = Directory.GetDirectories(folder);

                    foreach (var f in folders)
                    {
                        var x = GetFiles(f, filePattern);
                        x.Subscribe(p => { o.OnNext(p); });
                    }

                    o.OnCompleted();

                    return Disposable.Empty;
                });
        }

        private static void Main(string[] args)
        {
            var o = GetFiles(@"d:\temp", "*.*");

            o.Subscribe(p => { Console.WriteLine(p); });

            Console.Read();
        }
    }
}

(注意使用递归,再次调用GetFiles并订阅)

虽然它看起来很笨拙,但我不禁想到我应该使用像 Concat 这样的东西来组合序列,而不是仅仅将它们冒泡。

我还想将 Foreach 更改为 Parallel.ForEach,但我不确定使用 RX 会产生什么后果。我似乎找不到太多文档。

有关如何使用 RX 更好地编写此代码的任何提示?

【问题讨论】:

  • 哇,真不敢相信我从来不知道你可以用这个代替: var files = Directory.GetFiles(folder, filePattern, SearchOption.AllDirectories);简化了很多事情。但原始问题仍然适用。
  • 从文件系统中读取文件本质上是一个枚举——正如您所达到的解决方案所证明的那样。因此,这并不是学习 Rx 的好例子。它不是很“反应性”。更好的做法是创建一个 observable,每当一个文件被添加到一个目录时,它就会产生一个事件。 FileSystemWatcherObservable.FromEventPattern 结合的事件可能是这方面的起点。
  • 好建议。我只是在寻找一些可以尝试使用 RX 的东西,我会给你一个建议。谢谢。

标签: c# system.reactive


【解决方案1】:

要解决这样的问题,首先编写函数的 LINQ 版本会有所帮助。例如:

    static IEnumerable<string> GetFiles(string folder, string filePattern)
    {
        return Directory.GetFiles(folder, filePattern)
            .Concat(Directory.GetDirectories(folder).SelectMany(f => GetFilesEnumerable(f, filePattern)));
    }

然后只需将 IEnumerables 更改为 IObservables:

    static IObservable<string> GetFiles(string folder, string filePattern)
    {
        return Directory.GetFiles(folder, filePattern).ToObservable()
            .Concat(Directory.GetDirectories(folder).ToObservable().SelectMany(f => GetFilesEnumerable(f, filePattern)));
    }

【讨论】:

  • 非常好的思考方式。比我标记为答案的问题更好地回答原始问题。谢谢!
【解决方案2】:

事实证明,根据我对 SearchOption.AllDirectories 的发现(为什么我这么多年来从未注意到这一点),它可以归结为两行真正的代码:

using System;
using System.IO;
using System.Reactive.Linq;

namespace ConsoleApplication8
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            var o = Directory.GetFiles(@"e:\code", "*.*", SearchOption.AllDirectories).ToObservable();
            o.Subscribe(f => Console.WriteLine(f));


            Console.Read();
        }
    }
}

现在真的很简单。需要一个新的 RX 问题来解决。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-09-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多