【问题标题】:How to create channels in loop?如何在循环中创建频道?
【发布时间】:2020-01-19 01:29:03
【问题描述】:

我正在学习 Go 中的并发性及其工作原理。

我想做什么?

  • 循环遍历数据片段
  • 为所需/需要的数据创建结构
  • 为该结构创建通道
  • 使用 go rutine 调用 worker func 并将该通道传递给该 rutine
  • 使用来自通道的数据做一些处理
  • 将处理后的输出设置回通道
  • 在主线程中等待从我们启动的所有通道中获取输出

我尝试过的代码

        package main

    import (
        "fmt"
        "github.com/pkg/errors"
        "time"
    )

    type subject struct {
        Name string
        Class string
        StartDate time.Time
        EndDate time.Time
    }

    type workerData struct {
        Subject string
        Class string
        Result string
        Error error
    }

    func main () {

        // Creating test data
        var subjects []subject
        st,_ := time.Parse("01/02/2016","01/01/2015")
        et,_ := time.Parse("01/02/2016","01/01/2016")
        s1 := subject{Name:"Math", Class:"3", StartDate:st,EndDate:et }
        s2 := subject{Name:"Geo", Class:"3", StartDate:st,EndDate:et }
        s3 := subject{Name:"Bio", Class:"3", StartDate:st,EndDate:et }
        s4 := subject{Name:"Phy", Class:"3", StartDate:st,EndDate:et }
        s5 := subject{Name:"Art", Class:"3", StartDate:st,EndDate:et }
        subjects = append(subjects, s1)
        subjects = append(subjects, s2)
        subjects = append(subjects, s3)
        subjects = append(subjects, s4)
        subjects = append(subjects, s5)
        c := make(chan workerData) // I am sure this is not how I should be creating channel

        for i := 0 ; i< len(subjects) ; i++ {
            go worker(c)
        }

        for _, v := range subjects {
            // Setting required data in channel
            data := workerData{Subject:v.Name, Class:v.Class}

            // set the data and start the routine
            c <- data // I think this will update data for all the routines ? SO how should create separate channel for each routine

        }

        // I want to wait till all the routines set the data in channel and return the data from workers.
        for {
            select {
                case data := <- c :
                    fmt.Println(data)
            }
        }
    }

    func worker (c chan workerData) {
        data := <- c
        // This can be any processing
        time.Sleep(100 * time.Millisecond)
        if data.Subject != "Math" {
            data.Result = "Pass"
        } else {
            data.Error = errors.New("Subject not found")
        }
        fmt.Println(data.Subject)
        // returning processed data and error to channel
        c <- data
        // Rightfully this closes channel and here after I get error send on Closed channel.
        close(c)
    }

Playgorund 链接 - https://play.golang.org/p/hs1-B1UR98r

我面临的问题

我不确定如何为每个数据项创建不同的通道。我目前正在做的方式将更新所有例程的通道数据。我想知道有没有办法为循环中的每个数据项创建不同的通道并将其传递给 go rutine。然后在 main rutine 中等待从所有通道的 rutines 中获取结果。

任何指针/帮助会很棒吗?如果有任何困惑,请随时发表评论。

【问题讨论】:

  • 可能有一个小的(即 O(|CPU|),独立于 len(subjects) 工人池是正确的方法。但如果你真的希望每个主题有一个工人:做 不使用通道,您可以使用适当的数据直接启动工作程序。收集结果应始终使用 sync.Waitgroup 完成。有关并发性的示例,请参见 blog.golang.org。您尝试做什么do 看起来很可疑(即使这是一个简单的学习经历)。
  • 您可能想要使用两个不同的通道,一个用于发送作业,另一个用于收集本示例中提到的结果 - gobyexample.com/worker-pools。如果您需要更多帮助,请告诉我。
  • 如果你为每个值创建一个worker,为什么要在通道上发送值呢?只需将其作为参数传递。而且你永远不应该在同一个频道上输入和输出结果。

标签: go channel goroutine


【解决方案1】:

"//我认为这会更新所有例程的数据?"

通道(为了简化)不是存储数据的数据结构。

它是一种通过不同 goroutine发送接收数据的结构。

因此,请注意您的工作函数在每个 goroutine 实例中的同一通道上进行发送和接收。如果您只有一个此类工作者的实例,这将死锁 (https://golang.org/doc/articles/race_detector.html)。

在您发布的代码版本中,对于初学者来说,这似乎可行,因为您有许多工人相互交换作品。但是正确的程序是错误的。

因此,如果一个 worker 不能读写同一个通道,那么它必须消耗一个特定的可写通道来将其结果发送到其他一些例程。

// 我想等到所有例程都在通道中设置数据并 从工人那里返回数据。

这是确保推送器等待其所有工作人员完成工作后再继续进行所需的同步机制的一部分。 (这篇博文谈到了它https://medium.com/golangspec/synchronized-goroutines-part-i-4fbcdd64a4ec

// 正确地这会关闭通道,并在我收到错误发送后在这里关闭 关闭频道。

请注意您有 n 个并行执行的工作程序例程。第一个到达其函数末尾的 worker 将关闭通道,使其对其他 worker 不可写,并向 main 发出错误信号。

通常在写入端使用 close 语句来指示通道中没有更多数据。表示已经结束。阅读器使用此信号退出频道的读取等待操作。

作为一个例子,让我们回顾一下这个循环

    for {
        select {
            case data := <- c :
                fmt.Println(data)
        }
    }

很糟糕,真的很糟糕。

  1. 这是一个没有退出语句的无限循环
  2. select 是多余的,并且不包含 exit 语句,请记住在通道上读取是阻塞操作。
  3. 这是对语言提供的标准模式的糟糕重写,通道上的范围循环

通道上的范围循环写得很简单

    for data := range c {
        fmt.Println(data)
    }

这种模式有一个很大的优势,它会自动检测一个关闭的通道以退出循环!让您只循环处理要处理的相关数据。它也更简洁。

此外,您的工作人员很尴尬,因为它在退出之前只读取和写入一个元素。 生成 goroutine 很便宜,但不是免费的。您应该始终评估异步处理的成本与其实际工作负载之间的权衡。

总体而言,您的代码应该更接近此处演示的内容 https://gobyexample.com/worker-pools

【讨论】:

  • 感谢您的详细描述。它确实指出了我的方法中的流程。我需要在 go 中再次理解并发模式。很抱歉问,但你能推荐一本好书/博客,它可以帮助理解 go for 新手的并发性。
  • 特别是关于并发,nop。但 goperhcon 会定期提供有关该主题的视频 youtube.com/results?search_query=gophercon 。查看 gotour 和这个 gobyexample.com 。如果你知道你的模式,你应该能够实现你所需要的
猜你喜欢
  • 1970-01-01
  • 2020-09-16
  • 2014-06-19
  • 1970-01-01
  • 2011-06-28
  • 2019-06-16
  • 1970-01-01
  • 2020-08-12
  • 1970-01-01
相关资源
最近更新 更多