【问题标题】:Large number of transient objects - avoiding contention大量瞬态对象 - 避免争用
【发布时间】:2017-01-20 02:40:00
【问题描述】:

我有一个用 Go 编写的新 TCP 服务器,它连接了 100 多个客户端。每个客户端都输入需要集中查看的数据,因为他们正在查看来自不同位置的无线电数据包,然后进行分析。该代码有效,但我看到很多争用和锁定周围的 CPU 增加,并且在考虑如何避免锁定(如果可能)或围绕它进行优化。

由于 TCP 服务器为收到的每个数据包启动 GoRoutine,addMessage 函数需要一定程度的同步。稍后还会在另一个函数中分析这些数据包,该函数在地图上执行 RLock()

这是每秒调用一次的 cullMessages() 函数,它真正被自己赶上并且可能真的变慢,有时需要 2-3 秒才能运行,这使问题变得更加复杂,因为接下来的 2-3 操作排队等着解锁直接跑!

任何想法/想法将不胜感激!

var dataMessagesMutex sync.RWMutex
var dataMessages map[string][]*trackingPacket_v1

// Function is called from each TCP client who need to share this data
func addMessage(trackingPacket *trackingPacket_v1) {
    dataMessagesMutex.Lock()
    dataMessages[trackingPacket.packetID] = append(dataMessages[trackingPacket.packetID], trackingPacket)
    dataMessagesMutex.Unlock()
}

// Function called on a loop, need to delete based on age here
func cullMessages() {
    cullTS := time.Now().Add(-time.Second * MODES_MAX_MESSAGE_AGE)

    dataMessagesMutex.Lock()
    defer dataMessagesMutex.Unlock()

    for avr, data := range dataMessages {
        sort.Sort(PacketSorter(data))
        highestIndex := 0

        for i, messages := range data {
            if cullTS.Sub(messages.ProcessedTime) > 0 {
                // Need to delete the message here
                messages = nil
                highestIndex = i
            }
        }
        // Copy the new slice into the data variable
        data = data[highestIndex+1:]

        if len(data) == 0 {
            // Empty Messages, delete
            delete(dataMessages, avr)
        }
    }
}

更新: 新增分析功能

func processCandidates() {
    mlatMessagesMutex.RLock()
    defer dataMessagesMutex.RUnlock()

    for _, data := range dataMessages {
        numberOfMessages := len(data)
        for a := 0; a < numberOfMessages; a++ {
            packetA := data[a]
            applicablePackets := []*trackingPacket_v1{packetA}
            for b := 0; b < numberOfMessages; b++ {
                // Don't compare identical packets
                if b == a {
                    continue
                }

                packetB := data[b]

                // Only consider this packet if it's within an acceptable
                // timestamp threshold
                tsDelta := math.Abs(packetA.NormalisedTS - packetB.NormalisedTS)

                if tsDelta < MAX_MESSAGE_TS_DIFF {
                    // Finally, we need to make sure that only one message per
                    // station is included in our batch
                    stationAlreadyRepresented := false
                    for i := 0; i < len(applicablePackets); i++ {
                        if applicablePackets[i].Sharecode == packetB.Sharecode {
                            stationAlreadyRepresented = true
                        }
                    }

                    if stationAlreadyRepresented == false {

                        applicablePackets = append(applicablePackets, packetB)
                    }
                }
            }

            // Remove any stations which are deemed too close to one another
            if len(applicablePackets) >= MIN_STATIONS_NEEDED {
                applicablePackets = cullPackets(applicablePackets)
            }

            // Provided we still have enough packets....
            if len(applicablePackets) >= MIN_STATIONS_NEEDED {
                // Generate a hash for this batch...
                hash := generateHashForPackets(applicablePackets)
                batchIsUnique := true

                for _, packet := range applicablePackets {
                    if packet.containsHash(hash) {
                        batchIsUnique = false
                        break
                    }
                }

                if batchIsUnique == true {
                    for _, packet := range applicablePackets {
                        packet.addHash(hash)
                    }

                    go sendOfDataForWork(applicablePackets)
                }
            }

        }
    }
}

【问题讨论】:

  • 分析函数是一次只查看一个packetID,还是在它们之间交叉引用?
  • 感谢@andybalholm,不,它只在单个 packetID 和其中的消息中查找。我已经在上面添加了代码

标签: go


【解决方案1】:

不要使用一张大地图,而是为每个 packetID 设置一个 goroutine。调度程序 goroutine 可以有一个map[string]chan *trackingPacket_v1,并在适当的通道上发送传入的数据包。然后该 packetID 的 goroutine 会将数据包收集到一个本地切片中,并每隔一段时间对它们进行剔除和分析。

您需要以某种方式终止在 MODES_MAX_MESSAGE_AGE 中未收到数据包的 goroutine。调度程序 goroutine 可能会跟踪最近看到每个 packetID 的时间,并定期检查是否太旧。然后它将关闭这些通道并将它们从其地图中删除。当分析 goroutine 发现它的通道被关闭时,它会退出。

【讨论】:

  • 谢谢!明天我会试一试。在某种程度上,我是新来的。您能否快速展示我将如何为每个 ID 启动 goroutine?我还需要地图上的互斥锁吗?
  • 地图上的互斥体不再是必需的,因为地图对于调度程序 goroutine 来说是本地的(而不是共享的)。
  • 当调度程序发现一个它没有频道的 packetID 时,它会make 一个新频道,将其添加到它的地图中,然后执行go processPackets(packetID, ch) 之类的操作。
  • 谢谢。我不确定如何将所有 goroutines(每个套接字)从 TCP 服务器集中到调度程序。那会是另一个全球频道吗?
  • 是的,调度程序 goroutine 将从数据包通道中读取。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-05-28
  • 1970-01-01
  • 2017-07-11
  • 2013-04-07
  • 1970-01-01
  • 2011-09-24
  • 2019-06-28
相关资源
最近更新 更多