【问题标题】:Fix faulty connections in Go修复 Go 中的错误连接
【发布时间】:2018-04-25 11:57:05
【问题描述】:

我有一个客户端正在向(从)服务器发送(接收)数据。客户端的代码类似:

conn, _ := net.Dial("tcp", "127.0.0.1:3456")
reader := bufio.NewReader(conn)
writer := bufio.NewWriter(conn)

for true {
   writer.write(data)
   reader.read()
}

现在,假设服务器经常崩溃导致conn 出现故障。这意味着for 循环中的writeread 方法不会做任何事情,只会返回error。即使服务器在接下来的几秒钟内再次启动,客户端的for 循环代码也将无法与服务器通信,因为conn 出现故障。

我想要实现的是:让客户端在服务器再次恢复时继续运行。为此,我想到了以下方法:

    func fixConnection(conn *net.Conn, reader **[]bufio.Reader, writer **[]bufio.Writer) net.Conn {

        for true {

            oneByte := make([] byte, 1, 1)
            reader := bufio.NewReader(*conn)
            _, err := reader.Read(oneByte)
            if err != nil {
                for true {
                    var tmpConn net.Conn
                    tmpConn, err = net.Dial("tcp", "127.0.0.1:3456")
                    if err == nil {
                        *conn = tmpConn
                        *reader = bufio.NewReader(*conn)
                        *writer = bufio.NewWriter(*conn)
                    }
                    time.Sleep(time.Millisecond * 100)
                }
            } else {
                reader.UnreadByte()
                time.Sleep(time.Millisecond * 500)
                continue
            }

        }
}

然后我只在客户端添加一行:

conn, _ := net.Dial("tcp", "127.0.0.1:3456")
reader := bufio.NewReader(conn)
writer := bufio.NewWriter(conn)

// new line
go fixConnection(&conn, &reader, &writer)

for true {
   writer.write(data)
   reader.read()
}

我的方法至少存在一个问题:bufio 不是线程安全的,所以当fixConnection 改变读者(作者)时,可能会出现问题。在对读写器进行操作之前,有没有办法在不使用sync.Mutex 的情况下解决这个问题。

另外,有没有更好的方法来解决我上面提到的问题。即,当服务器再次恢复时再次连接到服务器?请注意,服务器可以随时接受多个客户端的连接。

【问题讨论】:

  • 如果一端崩溃,TCP 连接将无法维持。您需要应用层重新连接功能。
  • 是的,这就是我想要做的。服务器可以即时接受新的客户端。
  • 到目前为止我知道,你不能轻易地读写。您必须写入并关闭连接以进行确认,然后重新打开连接并读取并等待对方关闭连接以进行确认。

标签: go server client


【解决方案1】:

也许只是检查错误,像这样

func connect(addr string)(*bufio.Reader, *bufio.Writer, Error){
    conn, err := net.Dial("tcp", addr)  reader := bufio.NewReader(conn)
    if err != nill{
        return(nil, nil, err)
    }
    writer := bufio.NewWriter(conn)
    reader := bufio.NewReader(conn)
    return(reader, writer, nil)
}
for reader, writer, err := connect(adrr);;{
    if err !=nil {
        reader, writer, err = connect(adrr)
        continue
    }
    _, err = writer.write(data)
    err = reader.read()
}

【讨论】:

    【解决方案2】:

    这是一种方法:

    package main
    
    import (
        "net"
        "sync"
        "time"
    )
    
    type ReconnectingWriter struct {
        Dialer func() (net.Conn, error)
    
        mu   sync.RWMutex
        conn net.Conn
    }
    
    func NewReconnectingWriter(dialer func() (net.Conn, error)) *ReconnectingWriter {
        return &ReconnectingWriter{Dialer: dialer}
    }
    
    func (rw *ReconnectingWriter) getConn() (net.Conn, error) {
        rw.mu.RLock()
        conn := rw.conn
        rw.mu.RUnlock()
    
        if conn != nil {
            return conn, nil
        }
    
        rw.mu.Lock()
        defer rw.mu.Unlock()
    
        var err error
        if rw.conn == nil {
            rw.conn, err = rw.Dialer()
        }
    
        return rw.conn, err
    }
    
    func (rw *ReconnectingWriter) closeConn() {
        rw.mu.Lock()
        defer rw.mu.Unlock()
    
        if rw.conn != nil {
            rw.conn.Close()
            rw.conn = nil
        }
    }
    
    func (rw *ReconnectingWriter) Write(b []byte) (int, error) {
        for i := 0; ; i++ {
            if i > 0 {
                time.Sleep(time.Second) // replace this with exp backoff + jitter
            }
    
            // try to get a connection
            conn, err := rw.getConn()
            if err != nil {
                continue
            }
    
            // try to write the data
            n, err := conn.Write(b)
            if err != nil {
                rw.closeConn()
                continue
            }
    
            return n, err
        }
    }
    
    func main() {
        rw := NewReconnectingWriter(func() (net.Conn, error) {
            return net.Dial("tcp", "localhost:9000")
        })
        rw.Write([]byte("hello world"))
    }
    

    您不应该这样做,因为您最终会得到部分写入。如果这是一个 http 服务器,更好的解决方案是使用负载均衡器,因为它可以正确地重放整个请求。 (haproxy 或 envoy 是选项)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-02-14
      • 1970-01-01
      • 2020-04-20
      • 1970-01-01
      • 1970-01-01
      • 2019-04-30
      • 2019-09-15
      • 2020-05-22
      相关资源
      最近更新 更多