【发布时间】:2017-01-23 13:37:17
【问题描述】:
我正在尝试通过 GRPC 流公开一个 observable。 我的简化代码如下所示:
public override async Task Feed(Request request, IServerStreamWriter<Response> responseStream, ServerCallContext context)
{
var result = new Result();
try
{
await Observable.ForEachAsync(async value =>
{
await responseStream.WriteAsync(value);
});
}
catch (Exception ex)
{
Log.Info("Session ended:" + ex);
}
}
我收到以下错误:
W0123 14:30:59.709715 Grpc.Core.Internal.ServerStreamingServerCallHandler
2 Exception occured in handler. System.ArgumentException: Der Wert liegt außerhalb des erwarteten Bereichs. bei Grpc.Core.Internal.ServerStreamingServerCallHandler2.d__4.MoveNext() W0123 14:30:59.732716 处理 RPC 时出现 Grpc.Core.Server 异常。 System.InvalidOperationException: Der Vorgang ist aufgrund des aktuellen Zustands des Objekts ungültig。北 Grpc.Core.Internal.AsyncCallServer2.SendStatusFromServerAsync(Status status, Metadata trailers, Tuple2 optionalWrite) bei Grpc.Core.Internal.ServerStreamingServerCallHandler`2.d__4.MoveNext() --- Ende der Stapelüberwachung vom vorhergehenden Ort, an dem die Ausnahme ausgelöst wurde --- bei System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(任务 任务)贝 System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务 任务)bei Grpc.Core.Server.d__34.MoveNext()
您建议如何处理这个问题?我想我需要在同一个线程中处理 ForEachAsync。
【问题讨论】:
-
我已经转移到 BufferBlock 作为同步机制,而不是 rx。这似乎比使用带有调度的 rx 更干净、更容易。
标签: system.reactive grpc