【问题标题】:Shared resources with channels in google go与 google go 中的频道共享资源
【发布时间】:2014-01-17 06:42:48
【问题描述】:

我正在查看Google Go 语言,因为我正在构建一个实时系统,我发现通过渠道共享资源有点令人困惑。为了理解,我试图让不同的goroutines 以相同的次数递增和递减一个共享值,最终为 0。我知道我的代码是错误的,但我并没有真正得到它的窍门。有人愿意解释这里出了什么问题吗?

package main

import (
    . "fmt"
    . "runtime"
)

func increment(c chan int) {
    for x := 0; x < 10; x++ {
        a := <-c
        a++
        c <- a
    }
}

func decrement(c chan int) {
    for x := 0; x < 10; x++ {
        a := <-c
        a--
        c <- a
    }
}

func main() {
    GOMAXPROCS(NumCPU())
    c := make(chan int)
    go increment(c)
    go decrement(c)
    Println(<-c)
}

我可以使用互斥锁或信号量,类似于我使用CPython 所做的,尽管我想利用Go 中的通道。

**更新

添加WaitGroup 会改变程序流程吗?我添加了一个WaitGroup,效果很好。虽然,我在整个for循环之后添加了Done()函数,那么整个increment会在decrement之前运行吗?我有点希望它们尽可能“并行”运行,我知道只有一个例程可以访问我,但我希望它们彼此独立运行。

【问题讨论】:

  • Go 为您提供了多种同步方式:通道、互斥体、等待组、原子。通道非常适合 goroutine 之间的通信、向工作人员发送数据等。使用什么取决于任务。使用最清晰的代码。

标签: concurrency parallel-processing synchronization go


【解决方案1】:

您的代码存在一些问题:

  1. 两个 goroutine 尝试同时从通道中读取。这是一个死锁,因为通道中没有可读取的内容。

  2. Println(&lt;-c) 从通道读取一个值,而不是结果。如果您等待两个 goroutine 完成,它可能会读取结果,但这需要添加 WaitGroup。等待组就像一个信号量,允许每个 goroutine 递减挂起的 goroutine 的计数器,并允许调用者等待它们完成某些任务。

  3. 因为如果没有阅读器,则发送阻塞,如果没有发送器,则读取阻塞,你是一个。等待两个 goroutine 先完成,然后 b.读比写多一次(Println 读),你需要一个buffered channel,它在缓冲区中正好有一个额外的位置。

  4. 您需要在通道中推送一个初始值才能启动进程。

我对你的代码做了一些改动,这个例子现在可以工作了(虽然注意它不是真正的增量->减量->增量->....而是增量->增量->...->减量->递减->....直到我们完成。

package main

import (
    . "fmt"
    . "runtime"
    "sync"
)

func increment(c chan int, wg *sync.WaitGroup) {
    for x := 0; x < 10; x++ {
        a := <-c
        Println("increment read ", a)
        a++
        c <- a
    }
    Println("Incrment done!")
    wg.Done()
}

func decrement(c chan int, wg *sync.WaitGroup) {
    for x := 0; x < 10; x++ {
        a := <-c
        Println("Decrement read ", a)       
        a--
        c <- a
    }
    Println("Dencrment done!")  
    wg.Done()
}

func main() {
    GOMAXPROCS(NumCPU())

    //we create a buffered channel with 1 extra space. This means 
    //you can send one extra value into it if there is no reader, allowing for the final result to be pushed to println
    c := make(chan int, 1)

    //we create a wait group so we can wait for both goroutines to finish before reading the result
    wg := sync.WaitGroup{}
    wg.Add(1) //mark one started
    go increment(c, &wg)
    wg.Add(1) //mark another one started. We can just do Add(2) BTW
    go decrement(c, &wg)

    //now we push the initial value to the channel, starting the dialog
    c <- 0

    //let's wait for them to finish...
    wg.Wait()

    //now we have the result in the channel's buffer
    Println("Total: ", <-c )
}

【讨论】:

  • 无论如何,虽然这个例子展示了通道是如何工作的,但在这种情况下我不会使用通道。或者我至少会以不同的方式使用它们。作为一个用例,它没有多大意义。
  • 您的实际用例是什么?当然你不会增加和减少数字:)
  • 我实际上没有用例,我只是想掌握它的窍门。我想我现在要说声谢谢并继续阅读!
  • @Martol1ni 我建议观看此演讲,以更好地了解渠道和并发模式youtube.com/watch?v=QDDwwePbDtw
【解决方案2】:

Here is a complete example 我认为您正在谈论的那种共享状态引擎

注意使用WaitGroup,正如您在编辑中建议的那样,以同步两个频道。

PS 不要使用import . "fmt" 这被认为是不好的做法。

package main

import (
    "fmt"
    "runtime"
    "sync"
)

// Commands for the engine
const (
    INC = iota
    DEC
    ANSWER
    QUIT
)

// Engine which takes commands and acts on some shared state
func engine(c chan int, reply chan int) {
    counter := 0
    for {
        switch <-c {
        case INC:
            counter++
        case DEC:
            counter--
        case ANSWER:
            reply <- counter
        case QUIT:
            reply <- counter
            return

        }
    }
}

// Add n times then signal done via the waitgroup
func increment(n int, c chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for x := 0; x < n; x++ {
        c <- INC
    }
}

// Subtract n times then signal done
func decrement(n int, c chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for x := 0; x < n; x++ {
        c <- DEC
    }
}

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())

    // Start the engine
    c := make(chan int)
    reply := make(chan int)
    go engine(c, reply)

    // Do adding and subtracting and wait for them to finish
    wg := new(sync.WaitGroup)
    wg.Add(2)
    go increment(101, c, wg)
    go decrement(100, c, wg)
    wg.Wait()

    // Read the answer
    c <- ANSWER
    fmt.Printf("Total is %d\n", <-reply)

    // Stop the engine
    c <- QUIT
    <-reply
    fmt.Printf("All done\n")
}

【讨论】:

    猜你喜欢
    • 2013-09-03
    • 1970-01-01
    • 2021-02-10
    • 1970-01-01
    • 2014-06-27
    • 2018-12-19
    • 2014-03-02
    • 2021-10-10
    • 2015-10-27
    相关资源
    最近更新 更多