【发布时间】:2020-10-11 23:38:32
【问题描述】:
我一直在 Golang 中创建一个模拟器类型的系统,我想使用 InfluxDB 2.0 的 Golang 客户端存储来自该系统的数据。但是,想看看我是否可以通过 goroutine 提高数据库写入的速度。
由于为每个数据点设置一个 goroutine 会导致 InfluxDB2 不堪重负,我决定实现一个工作池系统来限制使用的 goroutine 的数量。然而,当数据通过工作池写入数据库时,它总是会被破坏,出现奇怪的峰值和以前不存在的值变化(而不是直线,它会摇摆不定) .)
我通过一个名为 Simulate 的函数来执行此操作,该函数接收一个时间值(用于时间序列数据库)、一个实体结构指针(包含要模拟的所有数据)和两个独立的客户端,每个客户端都写入一组不同的数据。
maxNumGoroutines := flag.Int("maxNumGoroutines", 10, "Max num of goroutines")
flag.Parse()
concurrentGoroutines := make(chan struct{}, *maxNumGoroutines) // Semaphore
var wg sync.WaitGroup // Wait for goroutines to finish
timeLength := setTimeLength(inputVar) // Example of setting length of time
simObjects:= &entities.Objects // objects are propagated as *Object, meaning no return value
// Additional entities also exist inside the entities struct
for timeIterator := 0; timeIterator <= timeLength; timeIterator++ {
for _, objectID:= range entities.Objects.GetObjectIDs() { // all objects within the simulation
wg.Add(1)
go func(entityState *entities.EntityHolder, chosenObj string, timeGo time.Time) {
defer wg.Done()
concurrentGoroutines <- struct{}{} // Set goroutine as busy
calc.Propagate(entityState.Objects.GetObject(chosenObj), timeGo) // Edit the value at pointer addr
calc.Metrics(entityState, chosenObj, timeGo, metricDB) // seperate further calcs and write
PassInflux(entityState.Objects.GetObject(chosenObj), clientDB, timeGo) // send propagated data
<-concurrentGoroutines // Free up goroutine
}(entities, objectID, timeIterator) // pass in as variables, otherwise operating on changing pointers
}
}
wg.Wait()
log.Println("Simulation complete.")
Metrics() 写入是非阻塞的,这意味着它可以异步写入。但是,我flush Propagate() 数据以确保在所有对象都传播完之后发送数据;否则,会尝试一次写入太多对象(即使数据库使用 5000 点的批量大小)。
我在这里遗漏了什么吗?有没有一种名义上的方法来设置一个带有指针的工作池?
【问题讨论】:
标签: database pointers go pool goroutine