【问题标题】:How can I ensure that a spawned go routine finishes processing an array on program terminiation如何确保 spawn goroutine 在程序终止时完成对数组的处理
【发布时间】:2022-01-27 22:18:10
【问题描述】:

我正在处理来自 kafka 主题的记录。我需要将这些记录发送到的端点支持发送最多 100 条记录的数组。 kafka 记录还包含执行 rest 调用的信息(目前只有 1 到 2 个变体,但随着处理不同记录类型的数量会增加)。我目前正在加载唯一配置的结构数组,当它们被发现时,这些配置中的每一个都有自己的队列数组。对于每个配置,我都会生成一个新的 go 例程,它将在计时器上处理其队列中的任何记录(例如 100 毫秒)。这个过程目前工作得很好。我遇到的问题是程序关闭时。我不想在队列中留下任何未发送的记录,并希望在应用程序关闭之前完成处理它们。下面的当前代码处理中断并开始检查队列深度,但是一旦中断发生,队列计数就不会减少,因此程序永远不会终止。任何想法将不胜感激。

package main

import (
    "context"
    "encoding/json"
    "os"
    "os/signal"
    "strconv"
    "syscall"
    "time"
    _ "time/tzdata"

    "go.uber.org/zap"
    "go.uber.org/zap/zapcore"
)

type ChannelDetails struct {
    ChannelDetails MsgChannel
    LastUsed       time.Time
    Active         bool
    Queue          []OutputMessage
}

type OutputMessage struct {
    Config  MsgConfig `json:"config"`
    Message string    `json:"message"`
}

type MsgConfig struct {
    Channel MsgChannel `json:"channel"`
}

type MsgChannel struct {
    Id      int    `json:"id"`
    MntDate string `json:"mntDate"`
    Otype   string `json:"oType"`
}

var channels []ChannelDetails

func checkQueueDepths() int {
    var depth int = 0
    for _, c := range channels {
        depth += len(c.Queue)
    }
    return depth
}

func TimeIn(t time.Time, name string) (time.Time, error) {
    loc, err := time.LoadLocation(name)
    if err == nil {
        t = t.In(loc)
    }
    return t, err
}

func find(channel *MsgChannel) int {
    for i, c := range channels {
        if c.ChannelDetails.Id == channel.Id &&
            c.ChannelDetails.MntDate == channel.MntDate {
            return i
        }
    }
    return len(channels)
}

func splice(queue []OutputMessage, count int) (ret []OutputMessage, deleted []OutputMessage) {
    ret = make([]OutputMessage, len(queue)-count)
    deleted = make([]OutputMessage, count)
    copy(deleted, queue[0:count])
    copy(ret, queue[:0])
    copy(ret[0:], queue[0+count:])
    return
}

func load(msg OutputMessage, logger *zap.Logger) {

    i := find(&msg.Config.Channel)

    if i == len(channels) {
        channels = append(channels, ChannelDetails{
            ChannelDetails: msg.Config.Channel,
            LastUsed:       time.Now(),
            Active:         false,
            Queue:          make([]OutputMessage, 0, 200),
        })
    }
    channels[i].LastUsed = time.Now()
    channels[i].Queue = append(channels[i].Queue, msg)
    if !channels[i].Active {
        channels[i].Active = true
        go process(&channels[i], logger)
    }
}

func process(data *ChannelDetails, logger *zap.Logger) {
    for {
        // if Queue is empty and not used for 5 minutes, flag as inActive and shut down go routine
        if len(data.Queue) == 0 &&
            time.Now().After(data.LastUsed.Add(time.Second*10)) { //reduced for example
            data.Active = false
            logger.Info("deactivating routine as queue is empty")
            break
        }

        // if Queue has records, process
        if len(data.Queue) != 0 {
            drainStart, _ := TimeIn(time.Now(), "America/New_York")
            spliceCnt := len(data.Queue)
            if spliceCnt > 100 {
                spliceCnt = 100 // rest api endpoint can only accept array up to 100 items
            }
            items := []OutputMessage{}
            data.Queue, items = splice(data.Queue, spliceCnt)
            //process items ... will send array of items to a rest endpoint in another go routine
            drainEnd, _ := TimeIn(time.Now(), "America/New_York")
            logger.Info("processing records",
                zap.Int("numitems", len(items)),
                zap.String("start", drainStart.Format("2006-01-02T15:04:05.000-07:00")),
                zap.String("end", drainEnd.Format("2006-01-02T15:04:05.000-07:00")),
            )

        }

        time.Sleep(time.Millisecond * time.Duration(500))
    }
}

func initZapLog() *zap.Logger {
    config := zap.NewProductionConfig()
    config.EncoderConfig.TimeKey = "timestamp"
    config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
    logger, _ := config.Build()
    zap.ReplaceGlobals(logger)
    return logger
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    logger := initZapLog()
    defer logger.Sync()

    test1 := `{
        "config": {
            "channel": {
                "id": 1,
                "mntDate": "2021-12-01",
                "oType": "test1"
            }
        },
        "message": "test message1"
    }`
    test2 := `{
        "config": {
            "channel": {
                "id": 2,
                "mntDate": "2021-12-01",
                "oType": "test2"
            }
        },
        "message": "test message2"
    }`
    var testMsg1 OutputMessage
    err := json.Unmarshal([]byte(test1), &testMsg1)
    if err != nil {
        logger.Panic("unable to unmarshall test1 data " + err.Error())
    }
    var testMsg2 OutputMessage
    err = json.Unmarshal([]byte(test2), &testMsg2)
    if err != nil {
        logger.Panic("unable to unmarshall test2 data " + err.Error())
    }

    exitCh := make(chan struct{})
    go func(ctx context.Context) {
        for {
            //original data is streamed from kafka
            load(testMsg1, logger)
            load(testMsg2, logger)

            time.Sleep(time.Millisecond * time.Duration(5))
            select {
            case <-ctx.Done():
                logger.Info("received done")
                var depthChk int
                for {
                    depthChk = checkQueueDepths()
                    if depthChk == 0 {
                        break
                    } else {
                        logger.Info("Still processing queues.  Msgs left: " + strconv.Itoa(depthChk))
                    }
                    time.Sleep(100 * time.Millisecond)
                }
                exitCh <- struct{}{}
                return
            default:
            }
        }
    }(ctx)

    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
    go func() {
        <-sigs
        depths := checkQueueDepths()
        logger.Info("You pressed ctrl + C. Queue depth is: " + strconv.Itoa(depths))
        cancel()
    }()
    <-exitCh
}

示例日志:

{"level":"info","timestamp":"2021-12-28T15:26:06.136-0500","caller":"testgo/main.go:116","msg":"processing records","numitems":91,"start":"2021-12-28T15:26:06.136-05:00","end":"2021-12-28T15:26:06.136-05:00"}
{"level":"info","timestamp":"2021-12-28T15:26:06.636-0500","caller":"testgo/main.go:116","msg":"processing records","numitems":92,"start":"2021-12-28T15:26:06.636-05:00","end":"2021-12-28T15:26:06.636-05:00"}
^C{"level":"info","timestamp":"2021-12-28T15:26:06.780-0500","caller":"testgo/main.go:205","msg":"You pressed ctrl + C. Queue depth is: 2442"}
{"level":"info","timestamp":"2021-12-28T15:26:06.783-0500","caller":"testgo/main.go:182","msg":"received done"}
{"level":"info","timestamp":"2021-12-28T15:26:06.783-0500","caller":"testgo/main.go:189","msg":"Still processing queues.  Msgs left: 2442"} --line repeats forever

【问题讨论】:

  • "你按 ctrl + C" 其实你不知道,只是你收到了一个 SIGINT 或 SIGTERM。 os.Interrupt 是 syscall.SIGINT 的别名,顺便说一下 (pkg.go.dev/os#Signal)

标签: go


【解决方案1】:

sync golang 包https://pkg.go.dev/sync 具有等待组类型,允许您在主例程返回之前等待一组 go 例程完成。

最佳用法示例在此博客文章中: https://go.dev/blog/pipelines

【讨论】:

  • 这几乎可以肯定是最好的答案,我相信提问者可以从这里弄清楚。但是,提供一个示例或有关使用它的更多信息也没有什么坏处。
【解决方案2】:

要“等待”从主 goroutine 内部生成的所有 goroutine 完成,有两种方法可以做到这一点。最简单的方法是添加一个

runtime.Goexit() 

到你的主协程的末尾,在 之后

简单地说,它就是这样做的:

“从主 goroutine 调用 Goexit 将终止该 goroutine 而 func main 没有返回。由于 func main 没有返回,程序继续执行其他 goroutine。如果所有其他 goroutine 退出,程序崩溃。”

另一种方法是使用等待组,将等待组视为计数器,程序将在调用该方法的行“等待”直到计数器达到零:

var wg sync.WaitGroup // declare the waitgroup

然后在您要等待的每个 goroutine 中,添加/增加等待组:

wg.Add() // you typically call this for each spawned goroutine

然后当你想声明 goroutine 已经完成工作时,你调用

wg.Done() // when you consider the spawned routine to be done call this

计数器递减

然后你希望代码“等待”直到计数器为零,你添加行:

wg.Wait() // wait here till counter hits zero

并且代码将阻塞,直到使用 Add() 计数并使用 Done() 递减的 goroutine 数量达到零

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2023-04-03
    • 2015-12-12
    • 2018-10-10
    • 2018-04-24
    • 1970-01-01
    • 2017-08-02
    • 2012-04-30
    • 2012-01-06
    相关资源
    最近更新 更多