【问题标题】:Fast sort partially sorted array快速排序部分排序数组
【发布时间】:2016-02-25 14:19:20
【问题描述】:

首先,它不是关于在我们开始排序之前具有可能按某种顺序排列的子序列的数组,而是关于特殊结构的数组。

我现在正在编写一个对数据进行排序的简单方法。到目前为止,我使用的是Array.Sort,但PLINQOrderBy 在大型阵列上的性能优于标准Array.Sort

所以我决定编写自己的多线程排序实现。想法很简单:在分区上拆分一个数组,对每个分区进行并行排序,然后将所有结果合并到一个数组中。

现在我已经完成了分区和排序:

public class PartitionSorter
{
    public static void Sort(int[] arr)
    {
        var ranges = Range.FromArray(arr);
        var allDone = new ManualResetEventSlim(false, ranges.Length*2);
        int completed = 0;
        foreach (var range in ranges)
        {
            ThreadPool.QueueUserWorkItem(r =>
            {
                var rr = (Range) r;
                Array.Sort(arr, rr.StartIndex, rr.Length);
                if (Interlocked.Increment(ref completed) == ranges.Length)
                    allDone.Set();
            }, range);
        }
        allDone.Wait();
    }
}

public class Range
{
    public int StartIndex { get; }
    public int Length { get; }

    public Range(int startIndex, int endIndex)
    {
        StartIndex = startIndex;
        Length = endIndex;
    }

    public static Range[] FromArray<T>(T[] source)
    {
        int processorCount = Environment.ProcessorCount;
        int partitionLength = (int) (source.Length/(double) processorCount);
        var result = new Range[processorCount];
        int start = 0;
        for (int i = 0; i < result.Length - 1; i++)
        {
            result[i] = new Range(start, partitionLength);
            start += partitionLength;
        }
        result[result.Length - 1] = new Range(start, source.Length - start);
        return result;
    }
}

结果我得到一个特殊结构的数组,例如

[1 3 5 | 2 4 7 | 6 8 9]

现在如何使用这些信息并完成排序?插入排序和其他排序不使用块中的数据已经排序的信息,我们只需要将它们合并在一起。我尝试应用来自Merge sort 的一些算法,但失败了。

【问题讨论】:

  • 由于您本质上是在进行合并排序,因此您应该继续朝那个方向发展!为什么没有实现 Merge-Sort?
  • @MrPaulch 因为很难实现in-place 合并排序。之前我用了一个qsort,虽然很到位,但是因为随机内存访问,性能比naive单线程差Array.Sort
  • 那么你应该让你的算法适应Quicksort - Array.Sort 实际上使用了 Introsort,它是 QuicksortHeapsort 的混合体 - 我曾经实现了一种高度专业化的快速排序算法,该算法的性能优于 Array.Sort,因为它知道必须排序的数据类型。
  • en.wikipedia.org/wiki/Timsort 在部分排序的集合上通常相当快... 算法找到已经排序的数据子集,并使用该知识更有效地对剩余部分进行排序
  • 如果一个或多个线程由于某种原因被阻塞,请记住线程数多于内核数会很有用。但是,如果所有线程都处于活动状态,那么由于额外的上下文切换,线程数多于内核数总是会减慢速度。我并不是说你永远不应该这样做。我只是在解释为什么多线程实现会比顺序实现慢。

标签: c# .net arrays algorithm sorting


【解决方案1】:

我已经使用并行快速排序实现进行了一些测试。

我在 Windows x64 10 上使用 RELEASE 构建测试了以下代码,使用 C#6 (Visual Studio 2015)、.Net 4.61 编译,并在任何调试器之外运行。

我的处理器是四核超线程(这肯定有助于任何并行实现!)

数组大小为 20,000,000(因此是一个相当大的数组)。

我得到了这些结果:

LINQ OrderBy()  took 00:00:14.1328090
PLINQ OrderBy() took 00:00:04.4484305
Array.Sort()    took 00:00:02.3695607
Sequential      took 00:00:02.7274400
Parallel        took 00:00:00.7874578

PLINQ OrderBy()LINQ OrderBy() 快​​得多,但比Array.Sort() 慢。

QuicksortSequential() 的速度与Array.Sort() 差不多

但有趣的是,QuicksortParallelOptimised() 在我的系统上明显更快 - 因此,如果您有足够的处理器内核,这绝对是一种有效的排序方式。

这是完整的可编译控制台应用程序。记得在 RELEASE 模式下运行它——如果你在 DEBUG 模式下运行它,计时结果将非常不正确。

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;

namespace Demo
{
    class Program
    {
        static void Main()
        {
            int n = 20000000;
            int[] a = new int[n];
            var rng = new Random(937525);

            for (int i = 0; i < n; ++i)
                a[i] = rng.Next();

            var b = a.ToArray();
            var d = a.ToArray();

            var sw = new Stopwatch();

            sw.Restart();
            var c = a.OrderBy(x => x).ToArray(); // Need ToArray(), otherwise it does nothing.
            Console.WriteLine("LINQ OrderBy() took " + sw.Elapsed);

            sw.Restart();
            var e = a.AsParallel().OrderBy(x => x).ToArray(); // Need ToArray(), otherwise it does nothing.
            Console.WriteLine("PLINQ OrderBy() took " + sw.Elapsed);

            sw.Restart();
            Array.Sort(d);
            Console.WriteLine("Array.Sort() took " + sw.Elapsed);

            sw.Restart();
            QuicksortSequential(a, 0, a.Length-1);
            Console.WriteLine("Sequential took " + sw.Elapsed);

            sw.Restart();
            QuicksortParallelOptimised(b, 0, b.Length-1);
            Console.WriteLine("Parallel took " + sw.Elapsed);

            // Verify that our sort implementation is actually correct!

            Trace.Assert(a.SequenceEqual(c));
            Trace.Assert(b.SequenceEqual(c));
        }

        static void QuicksortSequential<T>(T[] arr, int left, int right)
        where T : IComparable<T>
        {
            if (right > left)
            {
                int pivot = Partition(arr, left, right);
                QuicksortSequential(arr, left, pivot - 1);
                QuicksortSequential(arr, pivot + 1, right);
            }
        }

        static void QuicksortParallelOptimised<T>(T[] arr, int left, int right)
        where T : IComparable<T>
        {
            const int SEQUENTIAL_THRESHOLD = 2048;
            if (right > left)
            {
                if (right - left < SEQUENTIAL_THRESHOLD)
                {
                    QuicksortSequential(arr, left, right);
                }
                else
                {
                    int pivot = Partition(arr, left, right);
                    Parallel.Invoke(
                        () => QuicksortParallelOptimised(arr, left, pivot - 1),
                        () => QuicksortParallelOptimised(arr, pivot + 1, right));
                }
            }
        }

        static int Partition<T>(T[] arr, int low, int high) where T : IComparable<T>
        {
            int pivotPos = (high + low) / 2;
            T pivot = arr[pivotPos];
            Swap(arr, low, pivotPos);

            int left = low;
            for (int i = low + 1; i <= high; i++)
            {
                if (arr[i].CompareTo(pivot) < 0)
                {
                    left++;
                    Swap(arr, i, left);
                }
            }

            Swap(arr, low, left);
            return left;
        }

        static void Swap<T>(T[] arr, int i, int j)
        {
            T tmp = arr[i];
            arr[i] = arr[j];
            arr[j] = tmp;
        }
    }
}

【讨论】:

  • 我昨天尝试了这段代码,并且当数组已经排序时(例如,在new int[20*1000*1000]),AFAIR 这个枢轴非常糟糕
  • 只有幼稚的枢轴实现才能在没有cutting 每次调用一个元素的情况下获得有效结果。这就是我切换到合并排序的原因,它没有最坏的情况。好吧,现在的问题是支点。我会尝试自己解决。谢谢。
  • @AlexZhukovskiy 是的,看起来剩下的关键是一个不错的支点选择。
  • 我不确定我是否明白你在说什么。对于零数组,此枢轴将为每次调用返回一个左边界。所以会有 N 次枢轴调用,这足够慢。当然,在真正的不同数据上,这个枢轴工作得很好,但我想用这种排序作为通用排序方法。但是当在某些情况下性能下降到甚至低于冒泡排序的水平时,这并不好。我一直在寻找可以增强这种特殊情况的东西。只需在您使用 rng 的地方注释代码(第 3-6 行),您就会自己看到它
  • @AlexZhukovskiy 我希望你已经看过这个,但以防万一你还没有:en.wikipedia.org/wiki/Quicksort#Choice_of_pivot
猜你喜欢
  • 1970-01-01
  • 2015-06-28
  • 1970-01-01
  • 1970-01-01
  • 2018-05-27
  • 1970-01-01
  • 1970-01-01
  • 2013-08-25
  • 2020-12-20
相关资源
最近更新 更多