【发布时间】:2017-12-05 04:27:11
【问题描述】:
如何从处理程序外部为 RPC 生成流式响应值? (具体来说,来自 IObservable)我目前正在执行以下操作,但这会产生跨线程问题,因为 AnRxObservable 在 RPC 处理程序之间共享...
public override Task GetTicker(RequestProto request, ServerCallContext context)
{
var subscription = AnRxObservable.Subscribe(value =>
{
responseStream.WriteAsync(new ResponseProto
{
Value = value
});
});
// Wait for the RPC to be canceled (my extension method
// that returns a task that completes when the CancellationToken
// is cancelled)
await context.CancellationToken.WhenCancelled();
// Dispose of the buffered stream
bufferedStream.Dispose();
// Dispose subscriber (tells rx that we aren't subscribed anymore)
subscription.Dispose();
return Task.FromResult(1);
}
这段代码感觉不对……但我看不到任何其他方式从 RPC 处理程序外部创建的共享源流式传输 RPC 响应。
【问题讨论】:
-
“导致跨线程问题”是什么意思?
-
observable 推送数据的线程不是调用 GetTicker 的线程(来自 GRPC 内部)。例如,observable 总是从线程 id 1 推送值,但 GRPC 将为每个请求在不同的线程上调用 GetTicker(来自我相信的线程池)。问题是当有两个并发的 GetTicker RPC 请求时......客户端将无法预测地停止接收流。 TLDR; GRPC 线程是安全的吗……看起来不是,但我找不到任何证据来支持这一点。