【问题标题】:How to implement an IObserver with async/await OnNext/OnError/OnCompleted methods?如何使用 async/await OnNext/OnError/OnCompleted 方法实现 IObserver?
【发布时间】:2016-05-13 01:31:18
【问题描述】:

我正在尝试为 System.Net.WebSocket 编写一个扩展方法,该方法将使用 Reactive Extensions (Rx.NET) 将其转换为 IObserver。可以看下面的代码:

public static IObserver<T> ToObserver<T>(this WebSocket webSocket, IWebSocketMessageSerializer<T> webSocketMessageSerializer)
{
    // Wrap the web socket in an interface that's a little easier to manage
    var webSocketMessageStream = new WebSocketMessageStream(webSocket);

    // Create the output stream to the client
    return Observer.Create<T>(
        onNext:      async message => await webSocketMessageStream.WriteMessageAsync(webSocketMessageSerializer.SerializeMessage(message)),
        onError:     async error   => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Format("{0}: {1}", error.GetType(), error.Message)),
        onCompleted: async ()      => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.NormalClosure, "Server disconnected")
    );
}

此代码有效,但我担心在 onNext、onError 和 onCompleted lambda 中使用 async/await。我知道这会返回一个 async void lambda,这是不受欢迎的(有时会导致我自己已经遇到的问题)。

我一直在阅读 Rx.NET 文档以及互联网上的博客文章,但我似乎找不到在 IObserver 中使用 async/await 方法的正确方法(如果有的话)。有没有合适的方法来做到这一点?如果没有,那我该如何解决这个问题?

【问题讨论】:

  • 你的假设是错误的。实际上,您将Func&lt;Task&lt;T&gt;&gt;(或一些带参数的变体)传递给onNext 等。代表没有void 返回类型。他们返回任务。
  • 我不确定我是否关注你。我可以将异步方法分配给这样的 Action: Action test = async () => await DoSomethingAsync();这编译得很好,“测试”动作是不可等待的。调用 test() 在 DoSomethingAsync() 完成之前立即返回。 Observer.Create() 的方法定义接受 Actions 作为 onNext、onError 和 onCompleted 的参数。这就是我在上面发布的代码有效的原因。但是,当有人在我的 IObserver 上调用 OnNext(x) 时,它不会在返回之前等待方法的异步部分完成。
  • 糟糕。我的错。现在调查一下。
  • 一般指导 (IMO) 是您不要实施 IObserver&lt;T&gt; 或使用 Observer.Create。相反,您应该使用 Subscribe 扩展方法并将您的 lambdas 传递给它。
  • 为什么async 仍然在可观察对象上?调度程序允许您在没有编译器帮助的情况下进行异步。

标签: c# .net asynchronous task-parallel-library system.reactive


【解决方案1】:

订阅不采用async 方法。因此,这里发生的情况是您正在使用来自async void 的即发即弃机制。问题是 onNext 消息将不再被序列化。

您应该将其包装到管道中以允许 Rx 等待它,而不是在订阅中调用 async 方法。

onErroronCompleted 上使用即发即弃是可以的,因为它们保证是从Observable 调用的最后一件事。请记住,与Observable 关联的资源可以在onErroronCompleted 返回之后,在它们完成之前清理。

我写了一个小的扩展方法来完成这一切:

    public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError, Action onCompleted)
    {
        return source.Select(e => Observable.Defer(() => onNext(e).ToObservable())).Concat()
            .Subscribe(
            e => { }, // empty
            onError,
            onCompleted);
    }

首先,我们将async onNext 方法转换为Observable,连接两个异步世界。这意味着 onNext 中的 async/await 将受到尊重。接下来我们将该方法包装到Defer 中,这样它就不会在创建时立即启动。相反,Concat 只会在前一个完成时调用下一个延迟的 observable。

附言。我希望 Rx.Net 的下一个版本将在订阅中支持async

【讨论】:

  • “onNext 消息将不再被序列化”是什么意思?
  • Dorus 的意思是这里引入了两层异步。如果收到一条消息,它将调用WriteMessageAsync 方法。如果收到另一条消息,则无论先前的调用是否完成,它都会执行WriteMessageAsync。这意味着您可能会收到乱序消息(特别是如果 TaskPool 调度也参与其中)。即 onNext 消息将不再被序列化。
  • @Dorus,感谢您的回复。您介意编辑您的帖子以更详细地解释您的扩展方法的作用吗?我想知道的一件事是这是否真的会利用 async/await 或者它会在等待 OnNext 完成时阻塞线程?另外,我注意到 Rx.NET 在 NuGet 上提供了 v2.3 的 beta 版本,但我似乎找不到任何发行说明。你碰巧知道他们可能在哪里吗?
  • 我认为不会有另一个 2.x 版本的 Rx 发布。我相信 Bart 和团队目前正在开发 v3.0。希望我们能在接下来的几个月/几个季度中从他们身上看到一些东西。
  • 我认为他们目前正在避免它。 IMO 这有点像给孩子们锋利的刀。这些东西足够复杂,无需在查询中混合异步范例(Rx Schedulers vs TaskPool)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-06-13
  • 2016-02-18
  • 2018-06-29
  • 2021-03-28
  • 1970-01-01
  • 2019-01-17
  • 1970-01-01
相关资源
最近更新 更多