【发布时间】:2016-05-13 01:31:18
【问题描述】:
我正在尝试为 System.Net.WebSocket 编写一个扩展方法,该方法将使用 Reactive Extensions (Rx.NET) 将其转换为 IObserver。可以看下面的代码:
public static IObserver<T> ToObserver<T>(this WebSocket webSocket, IWebSocketMessageSerializer<T> webSocketMessageSerializer)
{
// Wrap the web socket in an interface that's a little easier to manage
var webSocketMessageStream = new WebSocketMessageStream(webSocket);
// Create the output stream to the client
return Observer.Create<T>(
onNext: async message => await webSocketMessageStream.WriteMessageAsync(webSocketMessageSerializer.SerializeMessage(message)),
onError: async error => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Format("{0}: {1}", error.GetType(), error.Message)),
onCompleted: async () => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.NormalClosure, "Server disconnected")
);
}
此代码有效,但我担心在 onNext、onError 和 onCompleted lambda 中使用 async/await。我知道这会返回一个 async void lambda,这是不受欢迎的(有时会导致我自己已经遇到的问题)。
我一直在阅读 Rx.NET 文档以及互联网上的博客文章,但我似乎找不到在 IObserver 中使用 async/await 方法的正确方法(如果有的话)。有没有合适的方法来做到这一点?如果没有,那我该如何解决这个问题?
【问题讨论】:
-
你的假设是错误的。实际上,您将
Func<Task<T>>(或一些带参数的变体)传递给onNext等。代表没有void返回类型。他们返回任务。 -
我不确定我是否关注你。我可以将异步方法分配给这样的 Action: Action test = async () => await DoSomethingAsync();这编译得很好,“测试”动作是不可等待的。调用 test() 在 DoSomethingAsync() 完成之前立即返回。 Observer.Create() 的方法定义接受 Actions 作为 onNext、onError 和 onCompleted 的参数。这就是我在上面发布的代码有效的原因。但是,当有人在我的 IObserver 上调用 OnNext(x) 时,它不会在返回之前等待方法的异步部分完成。
-
糟糕。我的错。现在调查一下。
-
一般指导 (IMO) 是您不要实施
IObserver<T>或使用Observer.Create。相反,您应该使用Subscribe扩展方法并将您的 lambdas 传递给它。 -
为什么
async仍然在可观察对象上?调度程序允许您在没有编译器帮助的情况下进行异步。
标签: c# .net asynchronous task-parallel-library system.reactive