【问题标题】:Serialize goroutines (parallelize but guarantee ordering)序列化 goroutines(并行但保证排序)
【发布时间】:2018-12-11 22:20:27
【问题描述】:

假设我们要并行处理一些计算,但我们必须保证结果的顺序与计算的顺序相同:

这可以通过例如:

https://play.golang.org/p/jQbo0EVLzvX

package main

import (
    "fmt"
    "time"
)

func main() {
    orderPutChans := make([]chan bool, 8)
    orderGetChans := make([]chan bool, 8)
    doneChans := make([]chan bool, 8)

    for i := 0; i < 8; i++ {
        orderPutChans[i] = make(chan bool, 1)
        orderGetChans[i] = make(chan bool)
        doneChans[i] = make(chan bool)
    }

    srcCh := make(chan int)
    dstCh := make(chan int)

    for i := 0; i < 8; i++ {
        go func(j int) {
            myGetCh := orderGetChans[j]
            nextGetCh := orderGetChans[(j+1) % 8]
            myPutCh := orderPutChans[j]
            nextPutCh := orderPutChans[(j+1) % 8]

            for {
                _ = <- myGetCh

                v, ok := <- srcCh

                if !ok {
                    k := (j + 1) % 8
                    if orderGetChans[k] != nil {
                            orderGetChans[k] <- true
                    }
                    orderGetChans[j] = nil

                    break
                }

                nextGetCh <- true

                time.Sleep(1000)

                v *= v

                _ = <- myPutCh

                dstCh <- v

                nextPutCh <- true
            }

            doneChans[j] <- true
        }(i)
    }

    go func() {
        for i := 0; i < 8; i++ {
            _ = <- doneChans[i]
        }
        close(dstCh)
    }()

    orderGetChans[0] <- true
    orderPutChans[0] <- true

    go func() {
        for i := 0; i < 100; i++ {
            srcCh <- i
        }
        close(srcCh)
    }()

    for vv := range dstCh {
        fmt.Println(vv)
    }
}

可以使用通道来传递通道的读/写权限。代码很乱,看起来也不是很整洁。 Go 中是否有更简洁的方法来实现这一目标?

编辑: 我不是要求“简单”的替换,例如使用chan struct{} 或在doneChans 上使用close 以支持doneChans[i] &lt;- true

编辑2

一个更简单的方法(至少就代码而言)是有一个results 数组,消费者将数据连同一个索引(这将是工作人员的 mod 数)一起发送,然后 goroutine 写入将结果发送到results[j],然后有一个 WaitGroup 等待所有操作完成(一批或多批),然后遍历结果并将它们发送到目标通道。 (可能因为虚假分享而不太好?)

【问题讨论】:

  • 如果您希望结果的顺序与输入的顺序相同,请为它们提供某种索引,并在所有结果准备好后对其进行排序。并发操作的顺序是不确定的。
  • 如果您有千兆字节的数据,等待 all 结果准备好是不可行的。当然可以将其分成更小的块,处理它们,然后对这些更小的块进行排序。当然,您也可以在我的版本中将内容作为“批量计算”发送。 (即发送一个整数数组,当然,这只是演示)。
  • > 并发操作的顺序是不确定的。这当然是真的。您可能会将计算拆分为可以彼此独立运行的部分,但随后必须以结果的顺序很重要并且必须与输入值的顺序相同的方式处理结果。
  • 正确。需要一些同步来提供预定义的结果顺序。您可以在一个线程中按顺序执行所有操作;您可以多线程执行所有操作,然后在单个线程中进行排序,或者按照您的建议分批执行所有操作,在单个线程中对批次进行连续排序,并使用锁定来确保批次按顺序交付。为实现可预测的并发操作顺序所做的任何事情都需要同步,从而增加复杂性并降低并发性。
  • 所以你不能,正如问题标题所说,“序列化 goroutines(并行化但保证排序)”。你不能一边吃一边吃蛋糕:要么是同步的,要么是并行的;要么是保证顺序 要么 异步。

标签: go parallel-processing


【解决方案1】:

如果我理解正确,这是您使用“管道”样式的代码版本。管道中有许多步骤:

  1. 发送 src 值
  2. 在接收到的 src 值中工作的工作人员,发送到自己的结果通道
  3. 将来自工作人员的结果通道切片合并到单个无序通道中
  4. 对无序合并通道中的无序值进行排序

这是代码,它使用您在对原始问题的编辑中提到的索引对样式。

type idxPair struct {
    idx, val int
}

func main() {
    // add a done channel, an ability to stop the world by closing this.
    done := make(chan struct{})
    defer close(done)

    // create srcChan, this will be where the values go into the pipeline
    srcCh := make(chan idxPair)

    // create a slice of result channels, one for each of the go workers
    const numWorkers = 8
    resChans := make([]<-chan idxPair, numWorkers)

    // waitgroup to wait for all the workers to stop
    var wg sync.WaitGroup
    wg.Add(numWorkers)

    // start the workers, passing them each the src channel,
    // collecting the result channels they return
    for i := 0; i < numWorkers; i++ {
        resChans[i]  = worker(done, &wg, srcCh)
    }

    // start a single goroutine to send values into the pipeline
    // all values are sent with an index, to be pieces back into order at the end.
    go func() {
        defer close(srcCh)
        for i := 1; i < 100; i++ {
            srcCh <- idxPair{idx: i, val: i}
        }
    }()

    // merge all the results channels into a single results channel
    // this channel is unordered.
    mergedCh := merge(done, resChans...)

    // order the values coming from the mergedCh according the the idxPair.idx field.
    orderedResults := order(100, mergedCh)

    // iterate over each of the ordered results
    for _, v := range orderedResults {
        fmt.Println(v)
    }
}

func order(len int, res <-chan idxPair) []int {
    results := make([]int, len)

    // collect all the values to order them
    for r := range res {
        results[r.idx] = r.val
    }

    return results
}

func worker(done <- chan struct{}, wg *sync.WaitGroup, src <-chan idxPair) <-chan idxPair {
    res := make(chan idxPair)

    go func() {
        defer wg.Done()
        defer close(res)
        sendValue := func(pair idxPair) {
            v := pair.val
            v *= v
            ip := idxPair{idx: pair.idx, val: v}
            select {
            case res <- ip:
            case <-done:
            }
        }

        for v := range src{
             sendValue(v)
        }
    }()

    return res
}


// example and explanation here: https://blog.golang.org/pipelines
func merge(done <-chan struct{}, cs ...<-chan idxPair) <-chan idxPair {
    var wg sync.WaitGroup
    out := make(chan idxPair)

    output := func(c <-chan idxPair) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

我认为这更干净,而不仅仅是“为了它而不同”的原因是:

  1. 您可以独立建模和实现每个阶段。 order 阶段可以轻松优化,以便在收到值时通过通道发送值等。
  2. 它的组合性更强;您可以对元素进行异步工作,而不是使用一种对存储在数组中的多个通道进行操作的大型方法,并将排序留作其他事情。这促进了重复使用。

【讨论】:

  • 非常感谢您的意见。这看起来很棒。在查看它时,我有另一个想法,它基本上围绕一个缓冲结果,一旦你有了下一个索引的结果,你就将它发送给结果 chan。 (本质上,您如何使用数据包编号订购 UDP 数据包。您缓冲具有数据包编号 > 下一个数据包的数据包,一旦收到应该是下一个数据包的数据包,您就处理它,然后在缓冲区中向前扫描,直到第一个“间隙”和然后冲洗并重复)。
  • 我想我会构建一些基准案例,然后尝试 cmets/answers 中提到的所有方法并发布结果。看看什么获得最大的吞吐量会很有趣。
  • 好主意,我脑子里也有类似的“缓冲”想法;为简单起见省略。但可能效果很好。我希望上面的想法足够灵活,可以轻松地整合这些优化!
猜你喜欢
  • 2017-08-18
  • 1970-01-01
  • 2015-06-25
  • 1970-01-01
  • 2017-02-10
  • 2016-11-08
  • 2018-06-14
  • 2021-04-10
相关资源
最近更新 更多