【问题标题】:Golang worker pool causes database corruptionGolang 工作池导致数据库损坏
【发布时间】:2020-10-11 23:38:32
【问题描述】:

我一直在 Golang 中创建一个模拟器类型的系统,我想使用 InfluxDB 2.0 的 Golang 客户端存储来自该系统的数据。但是,想看看我是否可以通过 goroutine 提高数据库写入的速度。

由于为每个数据点设置一个 goroutine 会导致 InfluxDB2 不堪重负,我决定实现一个工作池系统​​来限制使用的 goroutine 的数量。然而,当数据通过工作池写入数据库时​​,它总是会被破坏,出现奇怪的峰值和以前不存在的值变化(而不是直线,它会摇摆不定) .)

我通过一个名为 Simulate 的函数来执行此操作,该函数接收一个时间值(用于时间序列数据库)、一个实体结构指针(包含要模拟的所有数据)和两个独立的客户端,每个客户端都写入一组不同的数据。

maxNumGoroutines := flag.Int("maxNumGoroutines", 10, "Max num of goroutines")
flag.Parse()

concurrentGoroutines := make(chan struct{}, *maxNumGoroutines) // Semaphore 
var wg sync.WaitGroup // Wait for goroutines to finish

timeLength := setTimeLength(inputVar) // Example of setting length of time

simObjects:= &entities.Objects // objects are propagated as *Object, meaning no return value
// Additional entities also exist inside the entities struct

for timeIterator := 0; timeIterator <= timeLength; timeIterator++ { 
    for _, objectID:= range entities.Objects.GetObjectIDs() { // all objects within the simulation
        wg.Add(1)
        go func(entityState *entities.EntityHolder, chosenObj string, timeGo time.Time) { 
            defer wg.Done()
            concurrentGoroutines <- struct{}{} // Set goroutine as busy
            calc.Propagate(entityState.Objects.GetObject(chosenObj), timeGo) // Edit the value at pointer addr

            calc.Metrics(entityState, chosenObj, timeGo, metricDB) // seperate further calcs and write

            PassInflux(entityState.Objects.GetObject(chosenObj), clientDB, timeGo) // send propagated data 
            <-concurrentGoroutines // Free up goroutine
        }(entities, objectID, timeIterator) // pass in as variables, otherwise operating on changing pointers
    }
}
wg.Wait()
log.Println("Simulation complete.")

Metrics() 写入是非阻塞的,这意味着它可以异步写入。但是,我flush Propagate() 数据以确保在所有对象都传播完之后发送数据;否则,会尝试一次写入太多对象(即使数据库使用 5000 点的批量大小)。

我在这里遗漏了什么吗?有没有一种名义上的方法来设置一个带有指针的工作池?

【问题讨论】:

    标签: database pointers go pool goroutine


    【解决方案1】:

    事实证明,我错误地使用了 InfluxDB 2 客户端 - 我应该将写入 API 传递到传播函数而不是整个客户端。这样做意味着写入速度显着加快;因此,goroutine 不是必需的。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-09-17
      • 2013-11-04
      • 1970-01-01
      • 1970-01-01
      • 2021-10-17
      • 2014-03-17
      相关资源
      最近更新 更多