【发布时间】:2022-01-27 22:18:10
【问题描述】:
我正在处理来自 kafka 主题的记录。我需要将这些记录发送到的端点支持发送最多 100 条记录的数组。 kafka 记录还包含执行 rest 调用的信息(目前只有 1 到 2 个变体,但随着处理不同记录类型的数量会增加)。我目前正在加载唯一配置的结构数组,当它们被发现时,这些配置中的每一个都有自己的队列数组。对于每个配置,我都会生成一个新的 go 例程,它将在计时器上处理其队列中的任何记录(例如 100 毫秒)。这个过程目前工作得很好。我遇到的问题是程序关闭时。我不想在队列中留下任何未发送的记录,并希望在应用程序关闭之前完成处理它们。下面的当前代码处理中断并开始检查队列深度,但是一旦中断发生,队列计数就不会减少,因此程序永远不会终止。任何想法将不胜感激。
package main
import (
"context"
"encoding/json"
"os"
"os/signal"
"strconv"
"syscall"
"time"
_ "time/tzdata"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
type ChannelDetails struct {
ChannelDetails MsgChannel
LastUsed time.Time
Active bool
Queue []OutputMessage
}
type OutputMessage struct {
Config MsgConfig `json:"config"`
Message string `json:"message"`
}
type MsgConfig struct {
Channel MsgChannel `json:"channel"`
}
type MsgChannel struct {
Id int `json:"id"`
MntDate string `json:"mntDate"`
Otype string `json:"oType"`
}
var channels []ChannelDetails
func checkQueueDepths() int {
var depth int = 0
for _, c := range channels {
depth += len(c.Queue)
}
return depth
}
func TimeIn(t time.Time, name string) (time.Time, error) {
loc, err := time.LoadLocation(name)
if err == nil {
t = t.In(loc)
}
return t, err
}
func find(channel *MsgChannel) int {
for i, c := range channels {
if c.ChannelDetails.Id == channel.Id &&
c.ChannelDetails.MntDate == channel.MntDate {
return i
}
}
return len(channels)
}
func splice(queue []OutputMessage, count int) (ret []OutputMessage, deleted []OutputMessage) {
ret = make([]OutputMessage, len(queue)-count)
deleted = make([]OutputMessage, count)
copy(deleted, queue[0:count])
copy(ret, queue[:0])
copy(ret[0:], queue[0+count:])
return
}
func load(msg OutputMessage, logger *zap.Logger) {
i := find(&msg.Config.Channel)
if i == len(channels) {
channels = append(channels, ChannelDetails{
ChannelDetails: msg.Config.Channel,
LastUsed: time.Now(),
Active: false,
Queue: make([]OutputMessage, 0, 200),
})
}
channels[i].LastUsed = time.Now()
channels[i].Queue = append(channels[i].Queue, msg)
if !channels[i].Active {
channels[i].Active = true
go process(&channels[i], logger)
}
}
func process(data *ChannelDetails, logger *zap.Logger) {
for {
// if Queue is empty and not used for 5 minutes, flag as inActive and shut down go routine
if len(data.Queue) == 0 &&
time.Now().After(data.LastUsed.Add(time.Second*10)) { //reduced for example
data.Active = false
logger.Info("deactivating routine as queue is empty")
break
}
// if Queue has records, process
if len(data.Queue) != 0 {
drainStart, _ := TimeIn(time.Now(), "America/New_York")
spliceCnt := len(data.Queue)
if spliceCnt > 100 {
spliceCnt = 100 // rest api endpoint can only accept array up to 100 items
}
items := []OutputMessage{}
data.Queue, items = splice(data.Queue, spliceCnt)
//process items ... will send array of items to a rest endpoint in another go routine
drainEnd, _ := TimeIn(time.Now(), "America/New_York")
logger.Info("processing records",
zap.Int("numitems", len(items)),
zap.String("start", drainStart.Format("2006-01-02T15:04:05.000-07:00")),
zap.String("end", drainEnd.Format("2006-01-02T15:04:05.000-07:00")),
)
}
time.Sleep(time.Millisecond * time.Duration(500))
}
}
func initZapLog() *zap.Logger {
config := zap.NewProductionConfig()
config.EncoderConfig.TimeKey = "timestamp"
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
logger, _ := config.Build()
zap.ReplaceGlobals(logger)
return logger
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
logger := initZapLog()
defer logger.Sync()
test1 := `{
"config": {
"channel": {
"id": 1,
"mntDate": "2021-12-01",
"oType": "test1"
}
},
"message": "test message1"
}`
test2 := `{
"config": {
"channel": {
"id": 2,
"mntDate": "2021-12-01",
"oType": "test2"
}
},
"message": "test message2"
}`
var testMsg1 OutputMessage
err := json.Unmarshal([]byte(test1), &testMsg1)
if err != nil {
logger.Panic("unable to unmarshall test1 data " + err.Error())
}
var testMsg2 OutputMessage
err = json.Unmarshal([]byte(test2), &testMsg2)
if err != nil {
logger.Panic("unable to unmarshall test2 data " + err.Error())
}
exitCh := make(chan struct{})
go func(ctx context.Context) {
for {
//original data is streamed from kafka
load(testMsg1, logger)
load(testMsg2, logger)
time.Sleep(time.Millisecond * time.Duration(5))
select {
case <-ctx.Done():
logger.Info("received done")
var depthChk int
for {
depthChk = checkQueueDepths()
if depthChk == 0 {
break
} else {
logger.Info("Still processing queues. Msgs left: " + strconv.Itoa(depthChk))
}
time.Sleep(100 * time.Millisecond)
}
exitCh <- struct{}{}
return
default:
}
}
}(ctx)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
depths := checkQueueDepths()
logger.Info("You pressed ctrl + C. Queue depth is: " + strconv.Itoa(depths))
cancel()
}()
<-exitCh
}
示例日志:
{"level":"info","timestamp":"2021-12-28T15:26:06.136-0500","caller":"testgo/main.go:116","msg":"processing records","numitems":91,"start":"2021-12-28T15:26:06.136-05:00","end":"2021-12-28T15:26:06.136-05:00"}
{"level":"info","timestamp":"2021-12-28T15:26:06.636-0500","caller":"testgo/main.go:116","msg":"processing records","numitems":92,"start":"2021-12-28T15:26:06.636-05:00","end":"2021-12-28T15:26:06.636-05:00"}
^C{"level":"info","timestamp":"2021-12-28T15:26:06.780-0500","caller":"testgo/main.go:205","msg":"You pressed ctrl + C. Queue depth is: 2442"}
{"level":"info","timestamp":"2021-12-28T15:26:06.783-0500","caller":"testgo/main.go:182","msg":"received done"}
{"level":"info","timestamp":"2021-12-28T15:26:06.783-0500","caller":"testgo/main.go:189","msg":"Still processing queues. Msgs left: 2442"} --line repeats forever
【问题讨论】:
-
"你按 ctrl + C" 其实你不知道,只是你收到了一个 SIGINT 或 SIGTERM。 os.Interrupt 是 syscall.SIGINT 的别名,顺便说一下 (pkg.go.dev/os#Signal)
标签: go