【问题标题】:Finding Primes Using Parallel使用并行查找素数
【发布时间】:2012-09-29 19:08:13
【问题描述】:

所以我创建了以下方法来查找不超过某个数字的所有素数。关于如何加快速度的任何建议?

我这样称呼它;

interval = (value + NOOFTHREADS - 1) / NOOFTHREADS;
int max = interval * NOOFTHREADS;

tickets = new List<int>(NOOFTHREADS);
for (int i = 1; i <= NOOFTHREADS; i++)
{
    tickets.Add(i * (max / NOOFTHREADS));
}

Enumerable.Range(1, NOOFTHREADS)
.AsParallel()
.ForAll(_ => findPrimes());

带有一些全局变量;

static List<int> vals = new List<int>();
static List<int> tickets;
static int interval = new int();

以及方法;

public static void findPrimes()
    {
        int myTicket;
        lock (tickets)
        {
            myTicket = (int)tickets.Last();
            tickets.RemoveAt(tickets.Count - 1);
        }
        var max = myTicket;
        int min = max - interval +1;
        int num;

        var maxSquareRoot = Math.Sqrt(max);
        var eliminated = new System.Collections.BitArray(max + 1);
        eliminated[0] = true;
        eliminated[1] = true;
        for (int i = 2; i < (max) / 2; i++)
        {
            if (!eliminated[i])
            {
                if (i < maxSquareRoot)
                {
                    num = ((min + i -1 )/i)*i;

                    if (num == i)
                        num = num + i;

                    for (int j =num; j <= max; j += i)
                        eliminated[j] = true;
                }
            }
        }
        for (int b = (int)min; b < max; b++)
        {
            if (!eliminated[b])
                lock(vals)
                    vals.Add(b);
        }
    }

【问题讨论】:

  • 如果您的编辑是为了表明您希望进一步加快速度,那应该是一个新问题。然而,正常的步骤是 1. 从筛子中消除偶数,这是一个大约 2 的因子,很简单; 2. 也消除 3 的倍数,这是大约 1.5 的进一步因子,并且仍然相对容易; 3. 消除更多小素数的倍数,随着回报的减少越来越困难 - 或使用由其他人认真调整的库,例如Kim Walisch 的 primeSieve (C++) 或 Daniel Bernstein 的 primegen (C)。

标签: c# math parallel-processing primes


【解决方案1】:

是否有理由实施教科书数学可读平方根算法? 看看Prime Number Algorithm

【讨论】:

  • 这应该是一个评论,而不是真正回答你的问题。
  • 埃拉托色尼筛法不利于并行化。
  • 您的链接似乎与我正在做的事情相距一百万英里。虽然我不知道 c.我如何将该 c 示例应用到我的代码中?
  • 忽略我的回答。花费者有一个很好的观点。该算法(由 peoro 建议)按顺序从 2 到设定的限制。没有简单的方法可以从不同位置开始启动多个踏板。该算法(据我所见)具有 O(n) 复杂度,因此扔给您似乎是件好事。你不需要 c 。我的回答是愚蠢的,我不能真正帮助你解决你的问题,对不起。
【解决方案2】:

已经有很多关于并行代码与串行代码性能的文档。 Microsoft 建议您在将代码转换为使用并行架构之前阅读他们关于使用并行功能的文档。

考虑阅读我在此处发布的以下问答:

Nested Parallel.ForEach loops

Performance profiling in .NET

【讨论】:

  • 您是说(根据您的第二个链接)我的结果可能不够准确吗?
  • 这取决于您如何测试串行与并行循环的性能。如果您使用 .NET 的 Stopwatch 类进行测试,您的结果将不会完全准确。为了获得最佳准确性,您需要通过探查器运行代码。并行循环并不总是适用于所有场景,有时串行循环会更快!
  • 作为一个想法,您可以使用 Stopwatch,并多次迭代您的代码,将所有结果加在一起,然后除以迭代次数,这将为您提供粗略的平均执行次数时间。将分析基于第一次运行并不理想的原因是,第一次运行时,您的代码是 JIT 的,因此您需要补偿 JIT 开销。
【解决方案3】:

我认为你所有的锁都可能有问题,你应该尽量避免使用锁,它们非常昂贵。我不知道你算法的细节,但我认为你应该尝试以某种方式移除锁。票可以输入吗?他们可以有自己的输出队列,你可以在全部完成后合并?

【讨论】:

  • 我想知道锁是否是问题所在。不过想不出办法绕过他们。当它们都开始相同的方法时,我将如何将不同的数据输入到每个线程的方法中?
  • 据我所知,您将线程数作为输入,然后将输入丢弃。你应该在你的票列表上调用 .AsParallel() 。我认为拥有固定数量的线程不是 PLINQ 的工作
【解决方案4】:

Eratosthenes 的筛子可以相当容易地并行化,您只需将其分成单独的块并单独筛选每个块。你已经开始了分裂,但还没有走得足够远以获得好的结果。看看findPrimes()有什么问题

var max = myTicket;
int min = max - interval +1;
int num;

var maxSquareRoot = Math.Sqrt(max);
var eliminated = new System.Collections.BitArray(max + 1);

您为每个线程创建一个新的BitArray,涵盖从 0 到 max 的所有数字。对于筛选第一个块的线程来说,这很好,但对于后面的线程,您分配的内存比需要的多得多。使用较高的上限和许多线程,这本身就是一个问题,您大约分配了 (NOOFTHREADS + 1) * limit / 2 位,而只需要大约 limit 位。对于更少的线程和/或更低的限制,你仍然会恶化局部性并且会有更多的缓存未命中。

eliminated[0] = true;
eliminated[1] = true;
for (int i = 2; i < (max) / 2; i++)

i &gt; maxSquareRoot 时应该停止外循环。然后循环体不再做任何有成效的事情,它只执行一次读取和一两次检查。每次迭代都不需要很长时间,但是如果max 是例如,从√maxmax 的所有i 都会加起来。 1011。仅对最后一个块执行此操作可能比单线程单块筛子花费更长的时间。

{
    if (!eliminated[i])

eliminated[i] 只能对i &gt;= min(或i &lt; 2)成立,这种情况只会在i &lt;= maxSquareRoot 的第一个块中遇到(除非限制非常低)。所以对于其他块,你也消除了 4, 6, 8, 9, 10, 12, 14, ... 的倍数。很多浪费的工作。

    {
        if (i < maxSquareRoot)

如果maxSquareRoot恰好是一个素数,你并没有消除它的平方,比较应该是&lt;=

        {
            num = ((min + i -1 )/i)*i;

            if (num == i)
                num = num + i;

            for (int j =num; j <= max; j += i)
                eliminated[j] = true;
        }
    }
}

现在,筛分完成后,您将逐步检查BitArray 的块

for (int b = (int)min; b < max; b++)
{
    if (!eliminated[b])
        lock(vals)
            vals.Add(b);
}

当你找到一个素数时,你锁定列表vals 并将素数添加到它。如果有两个或更多线程几乎同时完成筛分,它们将在那里相互踩踏,锁定和等待将进一步减慢进程。

为了减少空间使用,每个线程应该创建一个maxSquareRoot 的素数列表,并使用它来消除其块中的复合,因此BitArray 只需要max - min + 1 位。每个创建自己的列表的线程都会重复一些工作,但由于这里的上限很小,所以额外的工作并不多。我不知道如何处理并发读取访问,如果这不增加同步开销,您也可以只为所有线程使用一个列表,但我怀疑这会有所收获。

代码大致如下:

List<int> sievePrimes = simpleSieve(maxSquareRoot);
// simpleSieve is a standard SoE returning a list of primes not exceeding its argument
var sieve = new System.Collections.BitArray(max - min + 1);
int minSquareRoot = (int)Math.Sqrt(min);
foreach(int p in sievePrimes)
{
    int num = p > minSquareRoot ? p*p : ((min + p - 1)/p)*p;
    num -= min;
    for(; num <= max-min; num += p)
    {
        sieve[num] =true;
    }
}

现在,为了避免在将素数添加到列表时线程互相踩到脚趾,每个线程都应该创建自己的素数列表并在一个步骤中附加它(我不能 100% 确定这比添加更快每个素数都有自己的锁,但如果不是,我会感到惊讶)

List<int> primes = new List<int>();
for(int offset = 0; offset <= max-min; ++offset)
{
    if (!sieve[offset])
    {
        primes.Add(min + offset);
    }
}
lock(vals) vals.AddRange(primes);

(并且vals 的初始容量应约为预期的素数数量,以避免重新分配每个块)

【讨论】:

  • 我在 SO 上得到的最佳答案。我花了一段时间才明白一切,但你的解释很棒。谢谢。如果我有一个查询,那就是您的方法似乎返回 1 作为质数。这是故意的吗?在大多数情况下,我认为 1 被忽略了。
  • 不,这是一个疏忽。我习惯在 5 或 7 开始筛子,所以我不需要对 1 采取特殊的预防措施,抱歉。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-02-19
  • 1970-01-01
相关资源
最近更新 更多