【问题标题】:Converting event based API to Rx.Net将基于事件的 API 转换为 Rx.Net
【发布时间】:2018-06-23 11:52:15
【问题描述】:

我正在尝试将现有的基于事件的 API 转换为 Reactive Observable API。我正在使用的具体 API 是 Xamarin.iOS 中的 NSNetServiceBrowser。此 API 让您可以使用 Zeroconf/Bonjour 浏览网络设备。但是,该问题适用于任何此类 API。

NsNetServiceBrowser 提供各种感兴趣的事件: - FoundService - NotSearched - ServiceRemoved

发现服务时引发FoundService事件,搜索失败时引发NotSearched

我想将 FoundServiceNotSerched 事件组合成 NSNetService 的 observable。

我当前的实现如下所示:

public IObservable<NSNetService> Search()
{
    var foundObservable = Observable
        .FromEventPattern<NSNetServiceEventArgs>(
            h => serviceBrowser.FoundService += h,
            h => serviceBrowser.FoundService -= h)
        .Select(x => x.EventArgs);

    var notSearchedObservable = Observable
        .FromEventPattern<NSNetServiceErrorEventArgs>(
            h => serviceBrowser.NotSearched += h,
            h => serviceBrowser.NotSearched -= h)
        .Select(x => x.EventArgs);

    var serviceObservable = Observable.Create(
        (IObserver<NSNetServiceEventArgs> observer) =>
        {
            notSearchedObservable.Subscribe(n =>
            {
                string errorMessage = $"Search for {serviceType} failed:";
                foreach (var kv in n.Errors)
                {
                    log.Error($"\t{kv.Key}: {kv.Value}");
                    errorMessage += $" ({kv.Key}, {kv.Value})";
                }
                observer.OnError(new Exception(errorMessage));
            });
            foundObservable.Subscribe(observer);
            return System.Reactive.Disposables.Disposable.Empty;
    }).Select(x => x.Service);

    serviceBrowser.SearchForServices(serviceType, domain);
    return serviceObservable;
}

代码看起来很笨拙,我有一种直觉,我没有正确使用System.Reactive?有没有一种更优雅的方式来组合事件对,其中一个正在产生,另一个是发出错误信号?这是 .NET 中现有的基于事件的 API 中的常见模式。


这是一个小型控制台应用程序(仅取决于 System.Reactive),说明了我想要 Reactify 的 API 类型:

using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace ReactiveLearning
{
    class Program
    {
        static void Main(string[] args)
        {
            var browser = new ServiceBrowser();

            var observableFound =
                Observable.FromEventPattern<ServiceFoundEventArgs>(
                    h => browser.ServiceFound += h,
                    h => browser.ServiceFound -= h)
                .Select(e => e.EventArgs.Service);

            var observableError =
                Observable.FromEventPattern<ServiceSearchErrorEventArgs>(
                    h => browser.ServiceError += h,
                    h => browser.ServiceError -= h);

            var foundSub = observableFound.Subscribe(s =>
            {
                Console.WriteLine($"Found service: {s.Name}");
            }, () => 
            {
                Console.WriteLine("Found Completed");
            });

            var errorSub = observableError.Subscribe(e =>
            {
                Console.WriteLine("ERROR!");
            }, () => 
            {
                Console.WriteLine("Error Completed");
            });

            browser.Search();

            Console.ReadLine();

            foundSub.Dispose();
            errorSub.Dispose();

            Console.WriteLine();
        }
    }

    class ServiceBrowser
    {
        public EventHandler<ServiceFoundEventArgs> ServiceFound;
        public EventHandler<ServiceSearchErrorEventArgs> ServiceError;

        public void Search()
        {
            Task.Run(async () =>
            {
                for (var i = 0; i < 5; ++i)
                {
                    await Task.Delay(1000);
                    ServiceFound?.Invoke(this, new ServiceFoundEventArgs(new Service($"Service {i}")));
                }

                var r = new Random();
                if (r.NextDouble() > 0.5)
                {
                    ServiceError?.Invoke(this, new ServiceSearchErrorEventArgs());
                }
            });
        }
    }

    class ServiceFoundEventArgs : EventArgs
    {
        public Service Service { get; private set;  }
        public ServiceFoundEventArgs(Service service) => Service = service;
    }

    class ServiceSearchErrorEventArgs : EventArgs {}

    class Service
    {
        public event EventHandler<EventArgs> AddressResolved;
        public event EventHandler<EventArgs> ErrorResolvingAddress;
        public string Name { get; private set; }
        public string Address { get; private set; }
        public Service(string name) => Name = name;

        public void ResolveAddress()
        {
            Task.Run(async () =>
            {
                await Task.Delay(500);
                var r = new Random();
                if (r.NextDouble() > 0.5)
                {                    
                    Address = $"http://{Name}.com";
                    AddressResolved?.Invoke(this, EventArgs.Empty);
                }
                else
                {
                    ErrorResolvingAddress?.Invoke(this, EventArgs.Empty);
                }
            });
        }
    }
}

【问题讨论】:

  • 是的,每当你这样做 return Disposable.Empty; 时,你就做错了。
  • 它会像return foundObservable.Select(x =&gt; x.Service).Merge(notSearchedObservable.Select(x =&gt; x.Service)); 这样简单,但是你还没有发布你的类定义,所以我不能肯定地说。你能发一个minimal reproducible example吗?
  • @Enigmativity 我不想合并它们 - 我想要一个 Observable,其中由 notSearchedObservable 生成的元素会引发错误(不生成元素)。我会看看我是否可以创建一个我想要“反应”的通用小例子
  • 这是一个小型控制台应用程序(仅取决于 System.Reactive),说明了我想要 Reactify 的 API 类型:gist.github.com/follesoe/60669ea2c5112cd733181d7870ba93da
  • 如果出现错误应该立即停止您的 observable 还是应该继续并能够报告多个错误?

标签: system.reactive


【解决方案1】:

感谢您提供出色的示例代码。您需要使用出色的 MaterializeDematerialize 运算符。方法如下:

var observableFoundWithError =
    observableFound
        .Materialize()
        .Merge(
            observableError
                .Materialize()
                .Select(x =>
                    Notification
                        .CreateOnError<Service>(new Exception("Error"))))
        .Dematerialize()
        .Synchronize();

using (observableFoundWithError.Subscribe(
    s => Console.WriteLine($"Found service: {s.Name}"),
    ex => Console.WriteLine($"Found error: {ex.Message}"),
    () => Console.WriteLine("Found Completed")))
{
    browser.Search();
    Console.ReadLine();
}

Materialize() 运算符将IObservable&lt;T&gt; 转换为IObservable&lt;Notification&lt;T&gt;&gt;,这允许通过OnNext 调用发出标准OnErrorOnCompleted。您可以使用Notification.CreateOnError&lt;T&gt;(new Exception("Error")) 构造可观察的元素,您可以使用Dematerialize() 将其转回IObservable&lt;T&gt;

我已经抛出 Synchronize() 以确保您创建了一个有效的 observable。 Materialize() 的使用确实让您可以构建不遵循常规可观察合同的可观察对象。 Synchronize() 所做的部分工作就是确保只有一个 OnError 和一个 OnCompleted,并删除出现在两者之一之后的任何 OnNext


试试这个作为在 cmets 中做你想做的事情的一种方式:

static void Main(string[] args)
{
    var browser = new ServiceBrowser();

    var observableFound =
        Observable.FromEventPattern<ServiceFoundEventArgs>(
            h => browser.ServiceFound += h,
            h => browser.ServiceFound -= h)
        .Select(e => e.EventArgs.Service);

    var observableError =
        Observable.FromEventPattern<ServiceSearchErrorEventArgs>(
            h => browser.ServiceError += h,
            h => browser.ServiceError -= h);

    var observableFoundWithError = observableFound
        .Materialize()
        .Merge(
            observableError
                .Materialize()
                .Select(x => Notification.CreateOnError<Service>(new Exception("Error"))))
        .Dematerialize()
        .Synchronize();

    Func<Service, IObservable<Service>> resolveService = s =>
        Observable.Create<Service>(o =>
        {
            var observableResolved = Observable.FromEventPattern<EventArgs>(
                h => s.AddressResolved += h,
                h => s.AddressResolved -= h);

            var observableResolveError = Observable.FromEventPattern<EventArgs>(
                h => s.ErrorResolvingAddress += h,
                h => s.ErrorResolvingAddress -= h);

            var observableResolvedWithError =
                observableResolved
                    .Select(x => s)
                    .Materialize()
                    .Merge(
                        observableResolveError
                        .Do(e => Console.WriteLine($"Error resolving: {s.Name}"))
                        .Materialize()
                        .Select(x => Notification.CreateOnError<Service>(new Exception($"Error resolving address for service: {s.Name}"))))
                    .Dematerialize()
                    .Synchronize();

            s.ResolveAddress();

            return observableResolvedWithError.Subscribe(o);
        });

    using (
        observableFoundWithError
            .Select(s => resolveService(s))
            .Switch()
            .Subscribe(
                s => Console.WriteLine($"Found and resolved service: {s.Name} ({s.Address})"),
                ex => Console.WriteLine($"Found error: {ex.Message}"),
                () => Console.WriteLine("Found Completed")))
    {
        browser.Search();
        Console.ReadLine();
    }
}

public class ServiceBrowser
{
    public event EventHandler<ServiceFoundEventArgs> ServiceFound;
    public event EventHandler<ServiceSearchErrorEventArgs> ServiceError;

    public void Search() { }
}

public class Service
{
    public event EventHandler<EventArgs> AddressResolved;
    public event EventHandler<EventArgs> ErrorResolvingAddress;

    public string Name;
    public string Address;

    public void ResolveAddress() { }
}

public class ServiceFoundEventArgs : EventArgs
{
    public Service Service;
}

public class ServiceSearchErrorEventArgs : EventArgs
{

}

我可能需要进行一些调整——可能是一个Observable.Delay。让我知道它是否有效。

【讨论】:

  • 太棒了!这回答了我最初的问题 - 所以我会将其标记为已接受。但是,我确实忘记提及我还希望 Service 仅在解决后在可观察流中生成。所以一旦找到服务;我需要调用ResolveAddress 并在AddressResolved 上返回服务或在ErrorResolvingAddress 上抛出错误。所以我想我必须在找到服务时制作某种内部流(使用 Materialize/DeMaterialize)?
  • 你必须向我展示代码的正常执行,我才能回答。
  • @JonasFollesø - 我设法在平板电脑上偷偷摸摸了一段时间。我有一个更新的答案给你。它可以编译(我必须为此编写测试类),但我无法运行代码来查看它是否真的有效。
  • @JonasFollesø - 抱歉,我的答案似乎没有保存。太糟糕了。我必须再做一次。
  • 是的,在 subscribe 处理程序中进行查询感觉很臭……非常感谢您的帮助!
猜你喜欢
  • 2019-09-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-04-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多