【问题标题】:GRPC async response stream C#GRPC 异步响应流 C#
【发布时间】:2017-12-05 04:27:11
【问题描述】:

如何从处理程序外部为 RPC 生成流式响应值? (具体来说,来自 IObservable)我目前正在执行以下操作,但这会产生跨线程问题,因为 AnRxObservable 在 RPC 处理程序之间共享...

public override Task GetTicker(RequestProto request, ServerCallContext context)
{
    var subscription = AnRxObservable.Subscribe(value =>
    {
        responseStream.WriteAsync(new ResponseProto
        {
            Value = value
        });
    });

    // Wait for the RPC to be canceled (my extension method
    // that returns a task that completes when the CancellationToken
    // is cancelled)
    await context.CancellationToken.WhenCancelled();

    // Dispose of the buffered stream
    bufferedStream.Dispose();

    // Dispose subscriber (tells rx that we aren't subscribed anymore)
    subscription.Dispose();

    return Task.FromResult(1);
}

这段代码感觉不对……但我看不到任何其他方式从 RPC 处理程序外部创建的共享源流式传输 RPC 响应。

【问题讨论】:

  • “导致跨线程问题”是什么意思?
  • observable 推送数据的线程不是调用 GetTicker 的线程(来自 GRPC 内部)。例如,observable 总是从线程 id 1 推送值,但 GRPC 将为每个请求在不同的线程上调用 GetTicker(来自我相信的线程池)。问题是当有两个并发的 GetTicker RPC 请求时......客户端将无法预测地停止接收流。 TLDR; GRPC 线程是安全的吗……看起来不是,但我找不到任何证据来支持这一点。

标签: c# grpc


【解决方案1】:

一般来说,当您尝试从推送模型(IObservable)转换为拉取模型(枚举响应以写入和写入它们)时,您需要一个中间缓冲区来存储消息 - 例如一个阻塞队列。然后,处理程序主体可以是一个异步循环,它尝试为队列获取下一条消息(最好以异步方式)并将其写入 responseStream。

另外,请注意 gRPC API 只允许您在任何给定时间有 1 个正在进行的响应 - 而您的 sn-p 不尊重这一点。所以你需要在开始另一个写入之前等待 WriteAsync() (这是你需要中间队列的另一个原因)。

此链接可能有助于解释推与拉范例:When to use IEnumerable vs IObservable?

【讨论】:

  • 我实际上确实遇到了 1 飞行中的问题,并实现了一个缓冲区 (gist.github.com/warrickw/f1795e188aeda5b59eae3e0ad786245a)...在我的情况下溢出了很多。 GRPC 是否在发送下一个值之前等待消息确认(可能是 200 毫秒后) - 有效地施加基于延迟的带宽瓶颈?
  • gRPC 等待确认写入操作,直到它通过网络发送消息(一旦 TCP/HTTP 流控制窗口允许) - 这是确保流控制所必需的(没有它,你会轻松淹没较慢的同行)。
猜你喜欢
  • 2014-07-22
  • 2014-06-29
  • 1970-01-01
  • 2023-03-07
  • 2021-03-29
  • 2018-08-25
  • 1970-01-01
  • 1970-01-01
  • 2019-07-28
相关资源
最近更新 更多