【问题标题】:Grpc - use rx observableGrpc - 使用 rx observable
【发布时间】:2017-01-23 13:37:17
【问题描述】:

我正在尝试通过 GRPC 流公开一个 observable。 我的简化代码如下所示:

public override async Task Feed(Request request, IServerStreamWriter<Response> responseStream, ServerCallContext context)
        {
            var result = new Result();
            try
            {
                await Observable.ForEachAsync(async value =>
                {
                    await responseStream.WriteAsync(value);
                });

            }
            catch (Exception ex)
            {
                Log.Info("Session ended:" + ex);
            }
        }

我收到以下错误:

W0123 14:30:59.709715 Grpc.Core.Internal.ServerStreamingServerCallHandler2 Exception occured in handler. System.ArgumentException: Der Wert liegt außerhalb des erwarteten Bereichs. bei Grpc.Core.Internal.ServerStreamingServerCallHandler2.d__4.MoveNext() W0123 14:30:59.732716 处理 RPC 时出现 Grpc.Core.Server 异常。 System.InvalidOperationException: Der Vorgang ist aufgrund des aktuellen Zustands des Objekts ungültig。北 Grpc.Core.Internal.AsyncCallServer2.SendStatusFromServerAsync(Status status, Metadata trailers, Tuple2 optionalWrite) bei Grpc.Core.Internal.ServerStreamingServerCallHandler`2.d__4.MoveNext() --- Ende der Stapelüberwachung vom vorhergehenden Ort, an dem die Ausnahme ausgelöst wurde --- bei System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(任务 任务)贝 System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务 任务)bei Grpc.Core.Server.d__34.MoveNext()

您建议如何处理这个问题?我想我需要在同一个线程中处理 ForEachAsync。

【问题讨论】:

  • 我已经转移到 BufferBlock 作为同步机制,而不是 rx。这似乎比使用带有调度的 rx 更干净、更容易。

标签: system.reactive grpc


【解决方案1】:

使用 gRPC 流式处理 API,您一次只能写入一项。如果您在前一个操作完成之前启动另一个 WriteAsync() 操作,您将收到异常。您还需要在从方法处理程序(在本例中为 Feed 方法)返回之前完成所有写入。一次只允许一次写入的原因是为了确保 gRPC 的流控制工作良好。

在您的情况下,Rx API 似乎无法确保这一点,因此解决此问题的一种方法是使用中间缓冲区。

【讨论】:

  • 我同意这一点。另一个问题是如何确定 IServerStreamWriter 的生命周期,因为它没有实现 IDisposable。正如评论我已经开始使用 BufferBlock 作为异步 Fifo 队列作为同步我
  • @weismat:你能从 gRPC 流创建一个 observable 吗?我不明白 BufferBlock 如何帮助您实现这一目标。你能举个例子吗?谢谢。
【解决方案2】:

这个怎么样?

public override async Task Feed(Request request, IServerStreamWriter<Response> responseStream, ServerCallContext context)
{
    var result = new Result();
    try
    {
        await Observable.Scan(Task.CompletedTask, (preceding,value) =>
            preceding.ContinueWith(_ => responseStream.WriteAsync(value))
        );
    }
    catch (Exception ex)
    {
        Log.Info("Session ended:" + ex);
    }
}

我还没有测试过,但我认为它仍然有效。

【讨论】:

    【解决方案3】:

    有两个问题阻止了 OP 的方法按预期工作,这两个问题都源于 responseStream.WriteAsync 的预期,即一次只能挂起一个写入。

    1. 订阅 observable 时,每个“onNext”都将在引发通知的同一线程上处理。这意味着您可以强制执行一次写入的规则。您可以使用 ObserveOn 方法来控制如何安排每个通知。
    var eventLoop = new EventLoopScheduler();
    var o = myObservable.ObserveOn(eventLoop);
    
    1. ForEachAsync 无法正常工作,就像人们对异步 lambda 的看法一样。如果您将 async/await 放在此处,而是像这样调用 WriteAsync,则 Write 将在预期的上下文中执行。
     responseStream.WriteAsync<Response>(value).Wait();
    

    这是一个完整的例子...

    public override async Task Feed(Request request, IServerStreamWriter<Response> responseStream, ServerCallContext context)
    {
      var o = getMyObservable(request);
    
      try
      {
          var eventLoop = new EventLoopScheduler(); //just for example, use whatever scheduler makes sense for you.
          await o.ObserveOn(eventLoop).ForEachAsync<Response>(value =>
          {
              responseStream.WriteAsync(value).Wait();
          }, 
          context.CancellationToken);
    
      }
      catch (TaskCanceledException) { }
      catch (Exception ex)
      {
          Log.Info("Session ended:" + ex);
      }
    
      Log.Info("Session ended normally");
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-09-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多