【问题标题】:how to implement status polling using Rx.Net?如何使用 Rx.Net 实现状态轮询?
【发布时间】:2017-11-30 15:08:50
【问题描述】:

我已经玩了 2 天了,现在尝试使用 .Net 的响应式扩展来拥抱响应式编程领域。

我构建了一个状态轮询用例,假设一个虚拟 Web API 和一个轮询状态对象的反应式客户端。

我尝试了以下代码:

        // Creates an observable that ticks each 1 second
        var ticksObservable = Observable.Interval(TimeSpan.FromMilliseconds(1000));



        // Creates a new observable transforming each tick to a string status requested from the api
        var coldStatusPollerObservable = ticksObservable.Select(tick =>
        {
            Console.WriteLine("Sending Request");
            var tsk = client.GetStatus(1); // Http get request to a web api resource (id == 1 just for demo)
            tsk.Wait();
            return tsk.Result;

        }
        );

       // Subscribe and print results on console
       coldStatusPollerObservable.Subscribe(
        status => Console.WriteLine(status), ex => Console.WriteLine(ex.Message)
        );

一切都很好,我得到了预期的输出:

{"status":"waiting"}
{"status":"running"}
{"status":"running"}
{"status":"running"}
{"status":"ok"}

然后我添加了另一个约束,即从 Web API 返回的随机错误请求。 发生的问题是我无法正确处理异常。 异常发生在 tsk.wait() 中,我期望它只会触发我传递给订阅方法 ( ex => Console 的 onError 操作.WriteLine(ex.Message) )

Q1:在这种情况下处理异常的正确方法是什么? Q2:使用 Rx.NET 是否有更简洁的轮询实现?

PS:我使用的是 Rx.NET 3.1.1

【问题讨论】:

    标签: c# system.reactive reactive-programming polling


    【解决方案1】:

    如果可以的话,您基本上希望避免使用任何 .Wait() 阻塞调用。 Rx 带有一个专门用于处理任务的运算符 - Observable.FromAsync

    所以你的基本查询现在变成了:

    var coldStatusPollerObservable =
        from tick in ticksObservable
        from status in Observable.FromAsync(() => client.GetStatus(1))
        select status;
    

    如果您想要控制台消息,请执行以下操作:

    var coldStatusPollerObservable =
        from tick in ticksObservable
        from status in Observable.FromAsync(() =>
        {
            Console.WriteLine("Sending Request");
            return client.GetStatus(1);
        })
        select status;
    

    请记住,尽可能坚持使用内置运算符。


    您可以像这样处理异常:

    void Main()
    {
        var coldStatusPollerObservable =
            from tick in Observable.Interval(TimeSpan.FromMilliseconds(1000))
            from status in
                Observable
                    .FromAsync(() => client.GetStatus(1))
                    .Catch<string, Exception>(ex => Observable.Return("Error"))
            select status;
    
        coldStatusPollerObservable.Subscribe(x => Console.WriteLine(x), ex => Console.WriteLine(ex.Message));
    }
    
    public static class client
    {
        private static int _counter = 0;
        public static Task<string> GetStatus(int id)
        {
            if (_counter++ == 5)
                throw new Exception();
            return Task.Run(() => _counter.ToString());
        }
    }
    

    这给出了:

    1 2 3 4 5 错误 7 8 9 10 11 12 13 14 15 ...

    【讨论】:

    • 关于重构异步调用的好方法。但它如何影响 Observable 中的异常处理?
    • 按预期工作。谢谢您的帮助。顺便提一下 object.Dump() 扩展方法是特定于 LINQ pad 的。
    • @AnisTissaoui - 谢谢。我删除了.Dump() 调用。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-01-08
    • 2016-03-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-11-12
    相关资源
    最近更新 更多