【问题标题】:Golang share big chunk of data between goroutinesGolang 在 goroutine 之间共享大量数据
【发布时间】:2016-09-19 03:59:18
【问题描述】:

我需要从另一个 goroutine 读取结构字段集,afaik 直接这样做,即使知道肯定会有没有并发访问(在读取发生之前写入完成,通过 chan struct{} 发出信号) 可能会导致数据过时

考虑到我可以保证没有并发访问,将 指针 发送到结构(在第一个 goroutine 中创建,在第二个 goroutine 中修改,由第三个读取)会解决可能的过时问题吗? p>

我想避免复制,因为结构很大并且包含在第二个 goroutine 中填充的巨大 Bytes.Buffer,我需要从第三个开始读取

有一个锁定选项,但考虑到我知道不会有并发访问,这似乎有点矫枉过正

【问题讨论】:

  • I know that there will be no concurrent access 所以 goroutine 没有同时运行?
  • 它们确实同时操作,但我可以通过简单快速的chan struct{} 轻松同步读/写,以确保在写完成后进行读取,即阅读器将等待作者完成修改结构字段(写入 Bytes.Buffer 并在结构中设置一些简单的字段(int))。按照设计,没有并发访问,但读取器和写入器总是位于不同的 goroutine 中
  • 如果没有并发访问,并且每个 goroutine 共享内存中相同结构的地址,那么就没有陈旧的理由。
  • 始终保护可同时访问的数据。如果没有争用,那么性能损失无论如何都可以忽略不计。不要依赖假设,它会导致难以调试的问题。另一方面,如果锁定导致性能问题,您可以随时调整代码。

标签: go concurrency channels


【解决方案1】:

这个有很多答案,这取决于你的数据结构和程序逻辑。

见:How to lock/synchronize access to a variable in Go during concurrent goroutines?
和:How to use RWMutex in Golang?

1- 使用Stateful Goroutines 和频道
2- 使用sync.Mutex
3- 使用同步/原子
4- 使用 WaitGroup
5- 使用程序逻辑(Semaphore)
...


1:Stateful Goroutines 和频道:
我模拟了非常相似的示例(假设您想从一个 SSD 读取并以不同的速度写入另一个 SSD):
在这个示例代码中,一个 goroutine(名为 write)做一些工作准备数据并填充大结构,另一个 goroutine(名为 read)从大结构中读取数据然后做一些工作,而 manger goroutine 保证不会并发访问相同的数据. 三个 goroutine 之间的通信是通过通道完成的。在您的情况下,您可以将指针用于通道数据,或像此示例这样的全局结构。
输出将是这样的:
平均值= 36.6920166015625 标准差= 6.068973186592054

我希望这可以帮助您了解这个想法。
工作示例代码:

package main

import (
    "fmt"
    "math"
    "math/rand"
    "runtime"
    "sync"
    "time"
)

type BigStruct struct {
    big     []uint16
    rpos    int
    wpos    int
    full    bool
    empty   bool
    stopped bool
}

func main() {
    wg.Add(1)
    go write()
    go read()
    go manage()
    runtime.Gosched()
    stopCh <- <-time.After(5 * time.Second)
    wg.Wait()
    mean := Mean(hist)
    stdev := stdDev(hist, mean)
    fmt.Println("mean=", mean, "stdev=", stdev)
}

const N = 1024 * 1024 * 1024

var wg sync.WaitGroup
var stopCh chan time.Time = make(chan time.Time)

var hist []int = make([]int, 65536)

var s *BigStruct = &BigStruct{empty: true,
    big: make([]uint16, N), //2GB
}

var rc chan uint16 = make(chan uint16)
var wc chan uint16 = make(chan uint16)

func next(pos int) int {
    pos++
    if pos >= N {
        pos = 0
    }
    return pos
}

func manage() {
    dataReady := false
    var data uint16
    for {
        if !dataReady && !s.empty {
            dataReady = true
            data = s.big[s.rpos]
            s.rpos++
            if s.rpos >= N {
                s.rpos = 0
            }
            s.empty = s.rpos == s.wpos
            s.full = next(s.wpos) == s.rpos
        }
        if dataReady {
            select {
            case rc <- data:
                dataReady = false
            default:
                runtime.Gosched()
            }
        }
        if !s.full {
            select {
            case d := <-wc:
                s.big[s.wpos] = d
                s.wpos++
                if s.wpos >= N {
                    s.wpos = 0
                }
                s.empty = s.rpos == s.wpos
                s.full = next(s.wpos) == s.rpos
            default:
                runtime.Gosched()
            }
        }
        if s.stopped {
            if s.empty {
                wg.Done()
                return
            }
        }

    }
}

func read() {
    for {
        d := <-rc
        hist[d]++
    }
}

func write() {
    for {
        wc <- uint16(rand.Intn(65536))
        select {
        case <-stopCh:
            s.stopped = true
            return
        default:
            runtime.Gosched()
        }
    }
}

func stdDev(data []int, mean float64) float64 {
    sum := 0.0
    for _, d := range data {
        sum += math.Pow(float64(d)-mean, 2)
    }
    variance := sum / float64(len(data)-1)
    return math.Sqrt(variance)
}
func Mean(data []int) float64 {
    sum := 0.0
    for _, d := range data {
        sum += float64(d)
    }
    return sum / float64(len(data))
}

5:某些用例的另一种方式(更快):
这是另一种使用共享数据结构进行读取作业/写入作业/处理作业的方法,它在第一篇文章中被分开,现在这里做同样的 3 个作业没有通道和没有 mutex .

工作样本:

package main

import (
    "fmt"
    "math"
    "math/rand"
    "time"
)

type BigStruct struct {
    big     []uint16
    rpos    int
    wpos    int
    full    bool
    empty   bool
    stopped bool
}

func manage() {
    for {
        if !s.empty {
            hist[s.big[s.rpos]]++ //sample read job with any time len
            nextPtr(&s.rpos)
        }
        if !s.full && !s.stopped {
            s.big[s.wpos] = uint16(rand.Intn(65536)) //sample wrire job with any time len
            nextPtr(&s.wpos)
        }
        if s.stopped {
            if s.empty {
                return
            }
        } else {
            s.stopped = time.Since(t0) >= 5*time.Second
        }
    }
}

func main() {
    t0 = time.Now()
    manage()
    mean := Mean(hist)
    stdev := StdDev(hist, mean)
    fmt.Println("mean=", mean, "stdev=", stdev)
    d0 := time.Since(t0)
    fmt.Println(d0) //5.8523347s
}

var t0 time.Time

const N = 100 * 1024 * 1024

var hist []int = make([]int, 65536)

var s *BigStruct = &BigStruct{empty: true,
    big: make([]uint16, N), //2GB
}

func next(pos int) int {
    pos++
    if pos >= N {
        pos = 0
    }
    return pos
}
func nextPtr(pos *int) {
    *pos++
    if *pos >= N {
        *pos = 0
    }

    s.empty = s.rpos == s.wpos
    s.full = next(s.wpos) == s.rpos
}

func StdDev(data []int, mean float64) float64 {
    sum := 0.0
    for _, d := range data {
        sum += math.Pow(float64(d)-mean, 2)
    }
    variance := sum / float64(len(data)-1)
    return math.Sqrt(variance)
}
func Mean(data []int) float64 {
    sum := 0.0
    for _, d := range data {
        sum += float64(d)
    }
    return sum / float64(len(data))
}

【讨论】:

  • 对于这么简单的事情,代码看起来太复杂了。 go 中的互斥锁正是因为这个原因而使用的——它会更简单,甚至可能更快。不要仅仅因为可以就使用渠道。此外,大多数时候使用runtime.Gosched() 意味着你做错了。你不需要它,从select 语句中删除default 案例。同时删除select 语句,只使用一个通道操作并直接使用通道
  • 我可以看到您的代码使用selectruntime.Gosched() 来访问频道而不会阻塞,但同样,您做错了。整个示例太复杂、不必要、脆弱,根本不是 Go 的做事方式。由于runtime.Gosched(),我不确定当 GOMAXPROCS 为 1 时它会如何表现。
  • 你测量了性能吗?我也不太确定互斥锁会更快。通道本身很昂贵,如果没有争用,互斥锁很便宜。但无论如何都没关系,互斥锁会让代码更简单,这更重要。在基准测试告诉您通道更快并且您可以证明为什么性能优势值得损害可读性之前,请使用互斥锁
  • github.com/golang/go/wiki/MutexOrChannelUse whichever is most expressive and/or most simple。这是几乎所有事情的通用规则,直到您有充分的理由需要使用更复杂的解决方案。但这不是您的代码的唯一问题,它本身并不是一个好的 Go 代码。
  • @creker:这被称为有状态 Goroutines:gobyexample.com/stateful-goroutines 如果你注释掉所有 runtime.Gosched() time=5.8523347s 及以上代码 time=5.0052863s 使用 runtime.Gosched() 会更快所以我保留它调度程序被指示将执行切换到另一个goroutine。这正是这里需要的,缓冲区是空的,需要另一个 goroutine 来填充缓冲区。关于频道我不同意你的看法。对 Golang 和并发来说非常基础,我在 Golang 之前一直在使用它:(Erlang Message Passing)它非常简单和便宜。
【解决方案2】:

为了防止对结构的并发修改同时保留读取能力,您通常会嵌入sync.RWMutex。这不是豁免。您可以在传输过程中简单地 lock your struct for writes 并在您方便的时间点将其解锁。

package main

import (
    "fmt"
    "sync"
    "time"
)

// Big simulates your big struct
type Big struct {
    sync.RWMutex
    value string
}

// pump uses a groutine to take the slice of pointers to Big,
// locks the underlying structs and sends the pointers to
// the locked instances of Big downstream
func pump(bigs []*Big) chan *Big {

    // We make the channel buffered for this example
    // for illustration purposes
    c := make(chan *Big, 3)

    go func() {
        for _, big := range bigs {
            // We lock the struct before sending it to the channel
            // so it can not be changed via pointer while in transit
            big.Lock()
            c <- big
        }
        close(c)
    }()

    return c
}

// sink reads pointers to the locked instances of Big
// reads them and unlocks them
func sink(c chan *Big) {

    for big := range c {
        fmt.Println(big.value)
        time.Sleep(1 * time.Second)
        big.Unlock()

    }
}

// modify tries to achieve locks to the instances and modify them
func modify(bigs []*Big) {
    for _, big := range bigs {

        big.Lock()
        big.value = "modified"
        big.Unlock()
    }
}

func main() {

    bigs := []*Big{&Big{value: "Foo"}, &Big{value: "Bar"}, &Big{value: "Baz"}}
    c := pump(bigs)

    // For the sake of this example, we wait until all entries are
    // send into the channel and hence are locked
    time.Sleep(1 * time.Second)

    // Now we try to modify concurrently before we even start to read
    // the struct of which the pointers were sent into the channel
    go modify(bigs)
    sink(c)

    // We use sleep here to keep waiting for modify() to finish simple.
    // Usually, you'd use a sync.waitGroup
    time.Sleep(1 * time.Second)

    for _, big := range bigs {
        fmt.Println(big.value)
    }

}

Run on playground

【讨论】:

    猜你喜欢
    • 2017-01-05
    • 1970-01-01
    • 1970-01-01
    • 2019-04-08
    • 1970-01-01
    • 2016-07-31
    • 2016-05-07
    • 2016-08-15
    • 2019-07-02
    相关资源
    最近更新 更多