【问题标题】:Re-creating mgo sessions in case of errors (read tcp 127.0.0.1:46954->127.0.0.1:27017: i/o timeout)出现错误时重新创建 mgo 会话(读取 tcp 127.0.0.1:46954->127.0.0.1:27017: i/o timeout)
【发布时间】:2017-04-03 14:14:04
【问题描述】:

我想知道 Go 中使用 mgo 的 MongoDB 会话管理,尤其是关于如何正确确保会话关闭以及如何对写入失败做出反应。

我已阅读以下内容:

Best practice to maintain a mgo session

Should I copy session for each operation in mgo?

仍然无法将其应用于我的情况。

我有两个 goroutine 将一个又一个事件存储到 MongoDB 中,共享同一个 *mgo.Session,两者看起来都像下面这样:

func storeEvents(session *mgo.Session) {
    session_copy := session.Copy()
    // *** is it correct to defer the session close here? <-----
    defer session_copy.Close()
    col := session_copy.DB("DB_NAME").C("COLLECTION_NAME")
    for {
        event := GetEvent()
        err := col.Insert(&event)
        if err != nil {
            // *** insert FAILED - how to react properly? <-----
            session_copy = session.Copy()
            defer session_copy.Close()
        }
    }
}

col.Insert(&event) 几个小时后返回错误

read tcp 127.0.0.1:46954->127.0.0.1:27017: i/o timeout

我不确定如何对此做出反应。发生此错误后,它会发生在所有后续写入中,因此似乎我必须创建一个新会话。对我来说似乎有替代方案:

1) 重启整个 goroutine,即

if err != nil {
    go storeEvents(session)
    return
}

2) 创建一个新的会话副本

if err != nil {
    session_copy = session.Copy()
    defer session_copy.Close()
    col := session_copy.DB("DB_NAME").C("COLLECTION_NAME")
    continue
}

--> 我如何使用defer session_copy.Close() 是否正确? (注意上面的 defer 引用了另一个会话的 Close() 函数。无论如何,这些会话永远不会关闭,因为该函数永远不会返回。也就是说,随着时间的推移,许多会话将被创建而不是关闭。

其他选项?

【问题讨论】:

  • 你应该看看这里blog.golang.org/defer-panic-and-recover.. 基本上如果 err != nil 你应该恐慌 err,然后通过恢复来捕获它,创建会话的副本并将该副本推迟到会议。您已经使用延迟正确关闭了连接。但是,如果您在插入时抛出错误,您还应该调查原因,而不是依赖尝试重新插入。我已经使用 mgo 完成了数百万次插入,并且从未抛出错误。事实上,我遇到的大多数 Mongo 错误都与 Mongo 本身有关,而不是驱动程序。
  • @reticentroot 感谢有关延迟的信息;关于插入错误背后的原因:你有什么建议去哪里看吗?我已经搜索了错误信息,在stackoverflow上找到了帖子,表明这是因为超时;对我来说,超时似乎不太可能,因为a)插入相当小,并且b)一旦发生错误,它会在每个后续插入中重新发生;尽管如此,我还是编写了代码来测量插入时间并记录它以防返回错误,但从那时起(昨天),还没有再次发生此类错误
  • 检查你的 mongod 实例。如果您打开更多连接然后您的实例可以处理,它可能会超时。您可以将错误添加到您的帖子中吗?
  • @reticentroot mongodb 日志中没有错误;只有许多“已接受连接”,然后是“结束连接”日志条目(时间戳仅间隔 10 毫秒);对于失败的插入,没有提到插入 - 通常,对于插入,日志有一个条目“插入 . 查询:
  • 实际上,在发生错误的原始设置中,我直接从两个不同的 goroutine 使用会话指针,而没有在每个 goroutine 中创建副本;现在,由于我在 storeEvents() 函数中更改为上面显示的代码,因此尚未出现错误;我会等到星期一,看看工作日会发生什么,还有更多活动

标签: mongodb go mgo


【解决方案1】:

所以我不知道这是否会对您有所帮助,但我对此设置没有任何问题。

我有一个从中导入的 mongo 包。这是我的 mongo.go 文件的模板

package mongo

import (
    "time"

    "gopkg.in/mgo.v2"
)

var (
    // MyDB ...
    MyDB DataStore
)

// create the session before main starts
func init() {
    MyDB.ConnectToDB()
}

// DataStore containing a pointer to a mgo session
type DataStore struct {
    Session *mgo.Session
}

// ConnectToTagserver is a helper method that connections to pubgears' tagserver
// database
func (ds *DataStore) ConnectToDB() {
    mongoDBDialInfo := &mgo.DialInfo{
        Addrs:    []string{"ip"},
        Timeout:  60 * time.Second,
        Database: "db",
    }
    sess, err := mgo.DialWithInfo(mongoDBDialInfo)
    if err != nil {
        panic(err)
    }
    sess.SetMode(mgo.Monotonic, true)
    MyDB.Session = sess
}

// Close is a helper method that ensures the session is properly terminated
func (ds *DataStore) Close() {
    ds.Session.Close()
}

然后在另一个包中,例如 main 根据下面的评论更新

package main

import (
    "../models/mongo"
)

func main() {
    // Grab the main session which was instantiated in the mongo package init function
    sess := mongo.MyDB.Session
    // pass that session in
    storeEvents(sess)
}

func storeEvents(session *mgo.Session) {
    session_copy := session.Copy()
    defer session_copy.Close()

    // Handle panics in a deferred fuction
    // You can turn this into a wrapper (middleware)
    // remove this this function, and just wrap your calls with it, using switch cases
    // you can handle all types of errors
    defer func(session *mgo.Session) {
        if err := recover(); err != nil {
            fmt.Printf("Mongo insert has caused a panic: %s\n", err)
            fmt.Println("Attempting to insert again")
            session_copy := session.Copy()
            defer session_copy.Close()
            col := session_copy.DB("DB_NAME").C("COLLECTION_NAME")
            event := GetEvent()
            err := col.Insert(&event)
            if err != nil {
                fmt.Println("Attempting to insert again failed")
                return
            }
            fmt.Println("Attempting to insert again succesful")
        }
    }(session)

    col := session_copy.DB("DB_NAME").C("COLLECTION_NAME")
    event := GetEvent()
    err := col.Insert(&event)
    if err != nil {
        panic(err)
    }
}

我在 AWS 上的生产服务器上使用了类似的设置。我每小时插入超过 100 万次。希望这可以帮助。我为确保 mongo 服务器可以处理连接所做的另一件事是在我的生产机器上创建 ulimit。在这个stack中讲到了

【讨论】:

  • storeEvents() 只是我在上面发布的代码的一个副本 - 您不必对错误感到恐慌,并按照您对我的帖子的评论中的建议使用 Recover 创建一个新会话
  • 那是因为在我的生产代码中,我使用了一个使用 gollira mux 和自定义处理程序(中间件)的 go 服务器,它包装了我的路由并在出现紧急情况时恢复服务器。而在您的代码中,我没有看到您是否使用了 go 服务器。所以我的恢复示例可能不适合你。您必须编写自己的恢复方法。是的,我在上面复制了您的代码,以便您可以看到我如何将我的示例与您的代码一起使用,这是给某人示例时通常要做的事情。最后要注意的主要事情是使用保存我的会话和初始化函数的结构。
  • @alex,我已经更新了上面的代码以提供恢复的示例,如果你要使用 Go,你仍然应该自己研究并了解它。如果您要使用其他语言,这与尝试和捕获异常相同。您不希望像上面使用 for {} 那样在无限循环中创建会话,因为您可能会产生太多的连接,而且您的中断条件总是有可能无法满足。如果插入导致超时,音量低,那么 mongo 服务器的设置存在严重错误。
猜你喜欢
  • 2015-11-07
  • 2023-02-24
  • 2015-09-15
  • 1970-01-01
  • 2020-05-01
  • 1970-01-01
  • 2014-02-27
  • 1970-01-01
  • 2016-07-01
相关资源
最近更新 更多