【发布时间】:2017-10-25 13:18:35
【问题描述】:
从 grpc ,客户端可以调用 CloseSend 关闭流到服务器,但似乎服务器无法切断与客户端的连接。
【问题讨论】:
-
你为什么知道?什么错误?
-
@jim 你能找到任何解决方案吗?
从 grpc ,客户端可以调用 CloseSend 关闭流到服务器,但似乎服务器无法切断与客户端的连接。
【问题讨论】:
首先,您不应该从连接的角度考虑 gRPC。
一旦处理程序返回error,ServerStream 将停止发送数据。服务端不需要CloseSend这样的函数。
如果你想在返回后执行一些逻辑,“只需”生成一个 goroutine。
【讨论】:
解决这个问题的最好方法是使用三个 goroutine。
问题是双向处理程序将在 Recv() 中被阻塞,因此当它想要完成流时不能轻易“返回给调用者”。
另一个问题是,如果网络条件不好,或者另一端不合作,Send() 可能会阻塞(这是您首先要终止连接的一个非常常见的原因!)
通常,您还希望从接收者循环之外向发送者发送消息——使用双向流的典型情况是与世界其他地方进行实际通信。对于更加隔离的请求/响应模式,其他模式通常更易于使用。允许同步发送/接收循环之外的人对消息进行排队会导致额外的同步。
因此,您可以像这样在 Bidi 处理程序中构建代码:
现在,当您想尝试发送给发送者 chan 时,请在 select{} 中执行此操作,如果传出 chan 已满,它将转到 default。您可以将此用作另一端应该断开连接的信号,或者只是丢弃消息以避免无限缓冲,具体取决于需要。
当想要断开客户端与服务器端的连接时,您应该取消阻塞双向处理程序正在阻塞的等待原语。这最终会使 Send() 和 Recv() 函数返回错误,之后您可以安全地声明连接“完成”。
还有远程客户端关闭的常规情况,Recv()调用返回错误,在这种情况下你也需要关闭它。
如果您想真正关闭所涉及的通道,并支持上下文取消以及断开无响应客户端的连接,您最终将不得不将通道与其他互斥锁同步,以避免意外发送已关闭的通道 - 没有否则可以安全地执行此操作,因为 Send()/Recv() 函数在双向处理程序已经返回之前不会解除阻塞。 (如果没有外部发送者,发送者的所有排队都发生在接收 goroutine 中,这不是问题。不过,这仅适用于非常简单的服务器。)
有点伪代码:
func (m *MyThing) MyBidiServer(stream somepb.Thing_ThingServer) {
can := make(chan struct{})
thingRunner := &ThingRunner{
cancel: can,
send: make(chan *somepb.OutgoingMessage, 100),
}
m.addRunner(thingRunner) // assume this is mutexed
defer m.removeRunner(thingRunner)
go thingRunner.recvLoop(stream)
go thingRunner.sendLoop(stream)
// could be select here if you also have a context
<-can
}
func (t *ThingRunner) recvLoop(stream somepb.Thing_ThingServer) {
for {
msg, err := stream.Recv()
if err != nil {
break
}
}
t.cancelSafely() // in case client disconnected
}
func (t *ThingRunner) sendLoop(stream somepb.Thing_ThingServer) {
for msg := range t.send {
if err := stream.Send(msg); err != nil {
break
}
}
t.cancelSafely() // in case of network error
}
func (t *ThingRunner) SendMessage(ctx context.Context, msg *somepb.Message) error {
if t.isCanceled() {
return ThingClosedError
}
select {
case t.send <- msg:
// all is well
case <-ctx.Done():
return ctx.Err()
default:
// the client is too slow -- disconnect it
t.cancelSafely()
return ClientTooSlowError
}
return nil
}
您还需要处理一些小的管理方面的问题,但这就是要点。 cancelSafely() 需要解除对 cancel chan 的阻塞,但它也需要不导致 t.send 上的未来写入恐慌,因为在关闭的 chan 上发送将无条件恐慌。
【讨论】: