【问题标题】:Why parallelization slows down my program?为什么并行化会减慢我的程序速度?
【发布时间】:2020-03-01 23:52:02
【问题描述】:

我有两个程序。他们解决了一个线性方程组。它们都可以正常工作(它们产生相同的结果)。

第一个程序在没有并发的情况下工作。

第二个程序与第一个程序非常相似,不同之处在于我在某些地方添加了并行性。这些地方都用代码标出来了。

这里有两个程序:

第一个。没有并发。

package main

import (
        "fmt"
        "math"
        "os"
        "time"
)

func main() {
        start := time.Now()
        N := 1000
        a := CreateRandomMatrix(N)
        b := CreateRandomVector(N)

        index := make([]int, len(a))
        for i := range index {
            index[i] = i
        }

        for i := 0; i < len(a); i++ {

            r := a[i][index[i]]

            var kk int

            var maxElemInRow float64
            for k := i; k < len(a); k++ {
                if math.Abs(a[i][index[k]]) > maxElemInRow {
                    kk = k
                    maxElemInRow = math.Abs(a[i][index[k]])
                }
            }

            index[i], index[kk] = index[kk], index[i]

            r = a[i][index[i]]

            if r == 0 {
                if b[i] == 0 {
                    fmt.Println("a lot of solutions")
                } else {
                    fmt.Println("no solutions")
                }
                os.Exit(1)
            }

            for j := 0; j < len(a[i]); j++ {
                a[i][index[j]] /= r
            }
            b[i] /= r

            for k := i + 1; k < len(a); k++ {
                r = a[k][index[i]]
                for j := 0; j < len(a[i]); j++ {
                    a[k][index[j]] = a[k][index[j]] - a[i][index[j]]*r
                }
                b[k] = b[k] - b[i]*r
            }

        }

        var x vector = make(vector, len(b))

        for i := len(a) - 1; i >= 0; i-- {
            x[i] = b[i]

            for j := i + 1; j < len(a); j++ {
                x[i] = x[i] - (x[j] * a[i][index[j]])
            }
        }

        result := make([]string, len(x))
        for i, val := range index {
            result[val] = fmt.Sprintf("%.2f", x[i])
        }
        fmt.Println("tested part took:", time.Now().Sub(start))
    }

第二个:

package main

import (
    "fmt"
    "math"
    "os"
    "sync"
    "time"
)

const (
    workers = 8
)

var wg sync.WaitGroup

func main() {

    start := time.Now()
    N := 1000
    a := CreateRandomMatrix(N)
    b := CreateRandomVector(N)

    index := make([]int, len(a))
    for i := range index {
        index[i] = i
    }

    for i := 0; i < len(a); i++ {

        r := a[i][index[i]]

        var kk int
        var max float64

        for k := i; k < len(a); k++ {
            if math.Abs(a[i][index[k]]) > max {
                kk = k
                max = math.Abs(a[i][index[k]])
            }
        }

        index[i], index[kk] = index[kk], index[i]

        r = a[i][index[i]]

        if r == 0 {
            if b[i] == 0 {
                fmt.Println("a lot of solutions")
            } else {
                fmt.Println("no solutions")
            }
            os.Exit(1)
        }

        // concurrency here
        for w := 0; w < workers; w++ {
            wg.Add(1)
            go func(w int) {
                start := len(a[i]) / workers * w
                end := len(a[i]) / workers * (w + 1)

                if end > len(a[i]) {
                    end = len(a[i])
                }

                for j := start; j < end; j++ {
                    a[i][index[j]] /= r
                }
                wg.Done()
            }(w)
        }

        b[i] /= r
        wg.Wait()

        for k := i + 1; k < len(a); k++ {
            r = a[k][index[i]]
            for j := 0; j < len(a[i]); j++ {
                a[k][index[j]] = a[k][index[j]] - a[i][index[j]]*r
            }
            b[k] = b[k] - b[i]*r
        }

    }

    var x vector = make(vector, len(b))

    for i := len(a) - 1; i >= 0; i-- {
        x[i] = b[i]

        for j := i + 1; j < len(a); j++ {
            x[i] = x[i] - (x[j] * a[i][index[j]])
        }
    }

    result := make([]string, len(x))
    for i, val := range index {
        result[val] = fmt.Sprintf("%.2f", x[i])
    }
    fmt.Println("tested part took:", time.Now().Sub(start))
}

两个程序的附加代码块相同

package main

import "math/rand"

type matrix [][]float64
type vector []float64

func CreateRandomMatrix(n int) matrix {
    m := make(matrix, n)
    for i := 0; i < n; i++ {
        m[i] = make(vector, n)
        for j := 0; j < n; j++ {
            m[i][j] = float64(rand.Intn(100))
        }
    }
    return m
}

func CreateRandomVector(n int) vector {
    v := make(vector, n)
    for i := 0; i < n; i++ {
        v[i] = float64(rand.Intn(100))
    }
    return v
}

所以。这是问题所在:

理论上,第二个程序应该运行得更快,因为一些计算分布在处理器内核上。但这不会发生。每次添加并行元素时,第二个程序都会开始变慢。

我测试了 N 的大值以及小值。第二版程序的运行时间明显落后于第一版。比如你设置N=3500,执行的时间差大概是10秒左右。

此外,如果您将工作人员的数量设置为 1,则第二个程序开始运行得更快。

为什么会这样?我在某个地方犯了错误吗?如何让分布式计算加速程序?

开始版本: 1.14。但我也在 1.13 版本上检查了这段代码。

添加:我发现如果程序使用大矩阵大小,那么并行版本开始赶上顺序版本。

编辑摘要:在第二个程序中,在计算kkmax 的地方删除了一个具有并行性的部分,以消除数据竞争。

【问题讨论】:

  • 您将并行性和并发性混为一谈。在当前版本的 Go 中,goroutines 不是可抢占的,因此像你这样具有紧密计算循环的 goroutines 不会屈服于其他 goroutines。您的程序可能仍会受益于多核,但这取决于这些 goroutine 的调度方式。
  • 另请注意,您的并行版本存在数据竞争。使用--race 运行它以查看它们。允许编译器重写你的代码,这样最大检查就不会像你期望的那样工作,因为它可以假设每个 goroutine 在访问 maxElemInRowkk 时是完全独立的,并相应地进行优化。 (比如它可以假设maxElemInRow后面不能改,所以如果它只是写它,它就不必读它。所以goroutine A可以写5,然后goroutine B写10,然后goroutine A写 7,因为它超过 5。)
  • @BurakSerdar:“在当前版本的 Go 中,goroutines 不可抢占,” False。 Goroutines 现在是异步可抢占的。 golang.org/doc/go1.14
  • @peterSO 你是对的,这里仍然是 1.13,是时候升级了。虽然 OP 没有指定版本...
  • 您假设您受 CPU 限制。你可能记忆力有限。

标签: go concurrency


【解决方案1】:

您在这里为非常少量的工作设置了很多复杂性:

    for j := 0; j < len(a[i]); j++ {
        a[i][index[j]] /= r
    }

这需要 N 个分区,因此在您的示例中大约为 1000 个。加上一些簿记。

你用这个替换它:

            start := len(a[i]) / workers * w
            end := len(a[i]) / workers * (w + 1)

            if end > len(a[i]) {
                end = len(a[i])
            }

            for j := start; j < end; j++ {
                a[i][index[j]] /= r
            }

每行执行 125 次除法 (1000/8),再加上 2 次额外除法加上 2 次额外乘法,再加上额外加法和额外减法(&gt; 就是这样完成的)。其他记账方法同理。所以在你开始之前就有 3% 的计算开销。这在并发工作中很正常;我只是提醒你,并发工作总是从一个它必须自己挖出来的洞开始的。

每行添加 8 个 goroutine 创建和 16 个 WaitGroup 操作(加上一个等待)。那是创建和销毁的 1000 个 goroutine。 Goroutines 很便宜,但它们并不那么便宜。在你运行一千次的循环中,它们绝对不便宜。

这是为了加快一段代码的速度,根据我的测量,这是你循环的 0.05%。您正在根据整个运行检查时间(包括首先创建矩阵)。但是当我测试主外循环(N=1000)时,大约是 20ms(串行)。除法环路约为 10µs。除法循环不是跨 CPU 传播的有趣部分。只存在 10µs 的 goroutine 并不能很好地利用资源。

您提到“在执行此操作之前,我使用 C# 代码打开了相应的文献,其中提供了并行化此程序的建议,结果显示速度显着提高。”这看起来在 C# 中也不会很快。如果您每行创建 8 个任务来进行除法,我预计这会遇到与 Go 相同的问题。

这里可能还存在内存局部性问题。与在内存区域之间跳转、使 CPU 缓存无效并浪费部分缓存行相比,将内存从 RAM 中分块取出并按顺序对其进行操作要高效得多。 (但我怀疑这被生成和销毁 8,000 个 goroutine 的成本所压倒。)

我对并发代码的速度感到非常惊讶。

与其选择单个的、小的位在内核之间并行运行,您要做的是构建您的算法,以便您可以将大量工作分配给少数工作人员,而不是将少量工作分配给大量工作工人。

【讨论】:

  • 感谢详细的分析和解释,现在一切都清楚了!我认为问题已解决。
【解决方案2】:

所以,感谢所有帮助我并向我解释我错了什么的人。我查看了代码并找到了一个可以通过并行计算来优化它的地方,并且由于这种方法在这个地方的有效性而真正提高了速度。

我之前尝试并行化的所有时刻,我都回到了最初的顺序状态。

我使用并行方法替换的代码带有注释。

代码如下:

package main

import (
    "fmt"
    "math"
    "os"
    "sync"
    "time"
)

const (
    workers = 8
)

var wg sync.WaitGroup

func main() {
    N := 3000

    a := CreateRandomMatrix(N)
    b := CreateRandomVector(N)
    start := time.Now()

    index := make([]int, len(a))
    for i := range index {
        index[i] = i
    }

    for i := 0; i < len(a); i++ {

        r := a[i][index[i]]

        var kk int
        var max float64

        for k := i; k < len(a); k++ {
            if math.Abs(a[i][index[k]]) > max {
                kk = k
                max = math.Abs(a[i][index[k]])
            }
        }

        index[i], index[kk] = index[kk], index[i]

        r = a[i][index[i]]

        if r == 0 {
            if b[i] == 0 {
                fmt.Println("a lot of solutions")
            } else {
                fmt.Println("no solutions")
            }
            os.Exit(1)
        }

        for j := 0; j < len(a[i]); j++ {
            a[i][index[j]] /= r
        }
        b[i] /= r

        //concurrency
        chunk := len(a) / workers
        for w := 0; w < workers; w++ {
            wg.Add(1)
            go func(start int) {
                end := start + chunk
                if end > len(a) {
                    end = len(a)
                }
                for k := start; k < end; k++ {
                    r := a[k][index[i]]

                    for j := 0; j < len(a[i]); j++ {
                        a[k][index[j]] = a[k][index[j]] - a[i][index[j]]*r
                    }
                    b[k] = b[k] - b[i]*r
                }
                wg.Done()
            }(w*chunk + i + 1)

        }
        wg.Wait()

    }

    var x vector = make(vector, len(b))

    for i := len(a) - 1; i >= 0; i-- {
        x[i] = b[i]

        for j := i + 1; j < len(a); j++ {
            x[i] = x[i] - (x[j] * a[i][index[j]])
        }
    }

    result := make([]string, len(x))
    for i, val := range index {
        result[val] = fmt.Sprintf("%.2f", x[i])
    }
    fmt.Println(time.Since(start))
}

在矩阵大小为 N = 3000 的情况下,具有并发性的程序在 19 秒 内计算线性方程组,而通常的版本花费 43 秒! !

【讨论】:

    猜你喜欢
    • 2019-05-29
    • 1970-01-01
    • 2018-03-11
    • 2010-11-17
    • 2020-05-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-30
    相关资源
    最近更新 更多