【问题标题】:Algorithm for N-way mergeN路合并算法
【发布时间】:2011-06-30 15:59:04
【问题描述】:

2 路合并作为 Mergesort 算法的一部分被广泛研究。 但我有兴趣找出执行 N 路合并的最佳方式?

可以说,我有 N 文件,每个文件都对 100 万个整数进行了排序。 我必须将它们合并到 1 个单个文件中,该文件将包含 1 亿个已排序的整数。

请记住,这个问题的用例实际上是基于磁盘的外部排序。因此,在实际场景中也会存在内存限制。因此,一次合并 2 个文件(99 次)的幼稚方法是行不通的。假设每个数组只有一个小的可用内存滑动窗口。

我不确定是否已经有针对这种 N 路合并的标准化解决方案。 (谷歌搜索并没有告诉我太多)

但是如果你知道一个好的n路合并算法,请发布算法/链接。

时间复杂度:如果我们大大增加要合并的文件 (N) 的数量,这将如何影响算法的时间复杂度?

感谢您的回答。

我在任何地方都没有被问到这个问题,但我觉得这可能是一个有趣的面试问题。因此被标记。

【问题讨论】:

  • 为什么文件的成对合并不起作用?
  • 为了记录,这被称为平衡线或k-way合并。平衡线算法通常具有 O(kn) 时间复杂度,其中 k 是文件数,n 是项目总数,而堆 k 路合并通常是 O(n log k)。两种算法都有 O(k) 空间复杂度。
  • @paul,好的,我必须承认我错了,因为空间考虑,一次合并 2 个文件是行不通的。但我认为这将是一个非常缓慢的算法,这就是它不起作用的原因。
  • @Bavarious 你能说一下为什么你认为像这样合并是 O(kn)。在我看来它是 O(n log k) (因为合并每一对是 O(n) 并且你必须这样做 O(log k) 次才能减少到一个文件)。它也只使用 O(1) 空间。
  • @PaulHankin Balance 行保留一个未排序的数组(而不是堆),其中包含从每个输入文件读取的最后一个键,因此 k 在 O(kn) 和 O(k) 中。对于小k来说已经足够了。

标签: algorithm merge


【解决方案1】:

一个简单的想法是保留要合并的范围的优先级队列,其存储方式是首先从队列中删除具有最小第一个元素的范围。然后,您可以按如下方式进行 N 路合并:

  1. 将所有范围插入优先级队列,不包括空范围。
  2. 当优先级队列不为空时:
    1. 从队列中取出最小的元素。
    2. 将此范围的第一个元素附加到输出序列。
    3. 如果它不为空,则将序列的其余部分重新插入优先级队列。

该算法的正确性本质上是对 2 路合并正确工作的证明的概括——如果你总是从任何范围添加最小的元素,并且所有范围都被排序,那么你最终得到的序列是整体排序。

该算法的运行时复杂度如下。设 M 为所有序列中元素的总数。如果我们使用二叉堆,那么我们最多从优先级队列中进行 O(M) 次插入和 O(M) 次删除,因为对于写入输出序列的每个元素,都有一个出队来提取最小序列,然后是enqueue 将序列的其余部分放回队列中。这些步骤中的每一个都需要 O(lg N) 操作,因为从包含 N 个元素的二叉堆中插入或删除需要 O(lg N) 时间。这给出了 O(M lg N) 的净运行时间,它随着输入序列的数量呈线性增长。

可能有一种方法可以更快地实现这一点,但这似乎是一个很好的解决方案。内存使用量为 O(N),因为我们需要 O(N) 的二进制堆开销。如果我们通过存储指向序列的指针而不是序列本身来实现二进制堆,那么这应该不是太大的问题,除非你有一个真正荒谬的序列要合并。在这种情况下,只需将它们合并到适合内存的组中,然后合并所有结果。

希望这会有所帮助!

【讨论】:

    【解决方案2】:

    下面的想法怎么样:

    1. 创建优先级队列

    2. 遍历每个文件f
      1. 使用第一个值作为优先键将 (nextNumberIn(f​​), f) 入队

    3. 当队列不为空时
      1. 队列头(m, f)出列
      2. 输出m
      3. 如果 f 没有耗尽
        1. 入队(nextNumberIn(f​​), f)

    由于将元素添加到优先级队列可以在对数时间内完成,因此第 2 项是 O(N × log N)。由于 while 循环的(几乎所有)迭代都添加了一个元素,因此整个 while 循环为 O(M × log N),其中 M 是要返回的数字的总数排序。

    假设所有文件都有一个非空的数字序列,我们有 M > N,因此整个算法应该是 O(M × log N)

    【讨论】:

    • 非常整洁。我在考虑类似的路线,除了我没有想到数字与文件的配对。谢谢。接受。
    • @aioobe:您的解决方案很简洁,但是您进行了很多读取调用,这会降低效率。例如,在第 3 项中,对于每个 while 迭代,您对 f 中的下一个元素进行读取调用。我认为如果您将 if 条件更改为以下内容会更好:如果 f 不存在于队列中并且 f 未耗尽。这样,您将仅在队列中不存在 f 元素时进行读取。此外,当您进行此读取时,您可以一次读取 f 中的一大块数字。
    • "如果 f 不在队列中",它在出队后永远不会出现,因为队列中的每个文件总是最多有一个值。关于您的第二条评论:您的建议并没有提高复杂性(除了它使答案更难阅读!)此外,请记住这是伪代码。它可以很好地用一些缓冲读取器来实现,它读取更大的块并缓存它们。
    • @Programmer,我认为您对读取次数有一个很好的看法,但是您实际上不必实现“如果 f 不在队列中”;您可以简单地缓冲 f 并按原样使用 aioobe 的算法,通过缓冲区读取 f 的值。
    • @RestlessC0bra,不,第二步在每个文件中插入第一个数字。在第 3 步(while 循环)中,一个值被出列,并且该文件中的下一个值被入列(除非该文件已耗尽)。还不清楚吗?
    【解决方案3】:

    搜索“多相合并”,查看经典作品 - Donald Knuth 和 E.H.Friend。

    此外,您可能想看看 Seyedafsari 和 Hasanzadeh 提出的 Smart Block Merging,与之前的建议类似,它使用优先级队列。

    另一个有趣的原因是 Kim & Kutzner 的 In Place Merging Algorithm

    我也推荐 Vitter 的这篇论文:External memory algorithms and data structures: dealing with massive data

    【讨论】:

    • 您的智能区块合并链接错误。这是一篇关于爱沙尼亚供应链的文章。
    • 杰里米,感谢您指出。 2012 年,waset.org 主持人似乎用新的会议记录覆盖了这些文件(最初于 2010 年发布)。我无法找到旧文章。如果有人有,请张贴/链接。
    【解决方案4】:

    http://en.wikipedia.org/wiki/External_sorting。这是我对基于堆的 k-way 合并的看法,使用来自源的缓冲读取来模拟 I/O 减少:

    public class KWayMerger<T>
    {
        private readonly IList<T[]> _sources;
        private readonly int _bufferSize;
        private readonly MinHeap<MergeValue<T>> _mergeHeap;
        private readonly int[] _indices;
    
        public KWayMerger(IList<T[]> sources, int bufferSize, Comparer<T> comparer = null)
        {
            if (sources == null) throw new ArgumentNullException("sources");
    
            _sources = sources;
            _bufferSize = bufferSize;
    
            _mergeHeap = new MinHeap<MergeValue<T>>(
                          new MergeComparer<T>(comparer ?? Comparer<T>.Default));
            _indices = new int[sources.Count];
        }
    
        public T[] Merge()
        {
            for (int i = 0; i <= _sources.Count - 1; i++)
                AddToMergeHeap(i);
    
            var merged = new T[_sources.Sum(s => s.Length)];
            int mergeIndex = 0;
    
            while (_mergeHeap.Count > 0)
            {
                var min = _mergeHeap.ExtractDominating();
                merged[mergeIndex++] = min.Value;
                if (min.Source != -1) //the last item of the source was extracted
                    AddToMergeHeap(min.Source);
            }
    
            return merged;
        }
    
        private void AddToMergeHeap(int sourceIndex)
        {
            var source = _sources[sourceIndex];
            var start = _indices[sourceIndex];
            var end = Math.Min(start + _bufferSize - 1, source.Length - 1);
    
            if (start > source.Length - 1)
                return; //we're done with this source
    
            for (int i = start; i <= end - 1; i++)
                _mergeHeap.Add(new MergeValue<T>(-1, source[i]));   
    
            //only the last item should trigger the next buffered read
            _mergeHeap.Add(new MergeValue<T>(sourceIndex, source[end]));
    
            _indices[sourceIndex] += _bufferSize; //we may have added less items, 
            //but if we did we've reached the end of the source so it doesn't matter
        } 
    }
    
    internal class MergeValue<T>
    {
        public int Source { get; private set; }
        public T Value { get; private set; }
    
        public MergeValue(int source, T value)
        {
            Value = value;
            Source = source;
        }
    }
    
    internal class MergeComparer<T> : IComparer<MergeValue<T>>
    {
        public Comparer<T> Comparer { get; private set; }
    
        public MergeComparer(Comparer<T> comparer)
        {
            if (comparer == null) throw new ArgumentNullException("comparer");
            Comparer = comparer;
        }
    
        public int Compare(MergeValue<T> x, MergeValue<T> y)
        {
            Debug.Assert(x != null && y != null);
            return Comparer.Compare(x.Value, y.Value);
        }
    }
    

    Here is one possible implementation of MinHeap&lt;T&gt;。一些测试:

    [TestMethod]
    public void TestKWaySort()
    {
        var rand = new Random();
        for (int i = 0; i < 10; i++)
            AssertKwayMerge(rand);
    }
    
    private static void AssertKwayMerge(Random rand)
    {
        var sources = new[]
            {
                GenerateRandomCollection(rand, 10, 30, 0, 30).OrderBy(i => i).ToArray(),
                GenerateRandomCollection(rand, 10, 30, 0, 30).OrderBy(i => i).ToArray(),
                GenerateRandomCollection(rand, 10, 30, 0, 30).OrderBy(i => i).ToArray(),
                GenerateRandomCollection(rand, 10, 30, 0, 30).OrderBy(i => i).ToArray(),
            };
        Assert.IsTrue(new KWayMerger<int>(sources, 20).Merge().SequenceEqual(sources.SelectMany(s => s).OrderBy(i => i)));
    }
    
    public static IEnumerable<int> GenerateRandomCollection(Random rand, int minLength, int maxLength, int min = 0, int max = int.MaxValue)
    {
        return Enumerable.Repeat(0, rand.Next(minLength, maxLength)).Select(i => rand.Next(min, max));
    }
    

    【讨论】:

    • 你的代码是什么语言? (抱歉,我是编程新手;我正在寻找 Java 解决方案)。
    • @Hengameh 它是 C#。语法与 Java 非常相似,所以翻译起来应该不会太难。
    【解决方案5】:

    合并 k 个已排序数组(每个长度为 n)的简单方法需要 O(n k^2) 时间而不是 O(nk) 时间。当你合并前 2 个数组时,它需要 2n 时间,然后当你将第三个与输出合并时,它需要 3n 时间,因为现在我们正在合并两个长度为 2n 和 n 的数组。现在,当我们将此输出与第四个合并时,此合并需要 4n 时间。因此,最后一次合并(当我们将第 k 个数组添加到已排序的数组中时)需要 k*n 时间。因此所需的总时间为 2n+ 3n + 4n +...k*n 这是 O(nk^2)。

    看起来我们可以在 O(kn) 时间内完成,但事实并非如此,因为每次我们要合并的数组的大小都会增加。
    尽管我们可以使用分而治之来实现更好的界限。我仍在努力,如果找到解决方案,我会发布解决方案。

    【讨论】:

      【解决方案6】:

      我编写了这段 STL 风格的代码,它执行 N 路合并,并认为我会将其发布在此处以帮助防止其他人重新发明轮子。 :)

      警告:它只是经过轻微测试。使用前进行测试。 :)

      你可以这样使用它:

      #include <vector>
      
      int main()
      {
          std::vector<std::vector<int> > v;
          std::vector<std::vector<int>::iterator> vout;
          std::vector<int> v1;
          std::vector<int> v2;
          v1.push_back(1);
          v1.push_back(2);
          v1.push_back(3);
          v2.push_back(0);
          v2.push_back(1);
          v2.push_back(2);
          v.push_back(v1);
          v.push_back(v2);
          multiway_merge(v.begin(), v.end(), std::back_inserter(vout), false);
      }
      

      它还允许使用成对的迭代器而不是容器本身。

      如果您使用 Boost.Range,您可以删除一些样板代码。

      代码:

      #include <algorithm>
      #include <functional>  // std::less
      #include <iterator>
      #include <queue>  // std::priority_queue
      #include <utility>  // std::pair
      #include <vector>
      
      template<class OutIt>
      struct multiway_merge_value_insert_iterator : public std::iterator<
          std::output_iterator_tag, OutIt, ptrdiff_t
      >
      {
          OutIt it;
          multiway_merge_value_insert_iterator(OutIt const it = OutIt())
              : it(it) { }
      
          multiway_merge_value_insert_iterator &operator++(int)
          { return *this; }
      
          multiway_merge_value_insert_iterator &operator++()
          { return *this; }
      
          multiway_merge_value_insert_iterator &operator *()
          { return *this; }
      
          template<class It>
          multiway_merge_value_insert_iterator &operator =(It const i)
          {
              *this->it = *i;
              ++this->it;
              return *this;
          }
      };
      
      template<class OutIt>
      multiway_merge_value_insert_iterator<OutIt>
          multiway_merge_value_inserter(OutIt const it)
      { return multiway_merge_value_insert_iterator<OutIt>(it); };
      
      template<class Less>
      struct multiway_merge_value_less : private Less
      {
          multiway_merge_value_less(Less const &less) : Less(less) { }
          template<class It1, class It2>
          bool operator()(
              std::pair<It1, It1> const &b /* inverted */,
              std::pair<It2, It2> const &a) const
          {
              return b.first != b.second && (
                  a.first == a.second ||
                  this->Less::operator()(*a.first, *b.first));
          }
      };
      
      struct multiway_merge_default_less
      {
          template<class T>
          bool operator()(T const &a, T const &b) const
          { return std::less<T>()(a, b); }
      };
      
      template<class R>
      struct multiway_merge_range_iterator
      { typedef typename R::iterator type; };
      
      template<class R>
      struct multiway_merge_range_iterator<R const>
      { typedef typename R::const_iterator type; };
      
      template<class It>
      struct multiway_merge_range_iterator<std::pair<It, It> >
      { typedef It type; };
      
      template<class R>
      typename R::iterator multiway_merge_range_begin(R &r)
      { return r.begin(); }
      
      template<class R>
      typename R::iterator multiway_merge_range_end(R &r)
      { return r.end(); }
      
      template<class R>
      typename R::const_iterator multiway_merge_range_begin(R const &r)
      { return r.begin(); }
      
      template<class R>
      typename R::const_iterator multiway_merge_range_end(R const &r)
      { return r.end(); }
      
      template<class It>
      It multiway_merge_range_begin(std::pair<It, It> const &r)
      { return r.first; }
      
      template<class It>
      It multiway_merge_range_end(std::pair<It, It> const &r)
      { return r.second; }
      
      template<class It, class OutIt, class Less, class PQ>
      OutIt multiway_merge(
          It begin, It const end, OutIt out, Less const &less,
          PQ &pq, bool const distinct = false)
      {
          while (begin != end)
          {
              pq.push(typename PQ::value_type(
                  multiway_merge_range_begin(*begin),
                  multiway_merge_range_end(*begin)));
              ++begin;
          }
          while (!pq.empty())
          {
              typename PQ::value_type top = pq.top();
              pq.pop();
              if (top.first != top.second)
              {
                  while (!pq.empty() && pq.top().first == pq.top().second)
                  { pq.pop(); }
                  if (!distinct ||
                      pq.empty() ||
                      less(*pq.top().first, *top.first) ||
                      less(*top.first, *pq.top().first))
                  {
                      *out = top.first;
                      ++out;
                  }
      
                  ++top.first;
                  pq.push(top);
              }
          }
          return out;
      }
      
      template<class It, class OutIt, class Less>
      OutIt multiway_merge(
          It const begin, It const end, OutIt out, Less const &less,
          bool const distinct = false)
      {
          typedef typename multiway_merge_range_iterator<
              typename std::iterator_traits<It>::value_type
          >::type SubIt;
          if (std::distance(begin, end) < 16)
          {
              typedef std::vector<std::pair<SubIt, SubIt> > Remaining;
              Remaining remaining;
              remaining.reserve(
                  static_cast<size_t>(std::distance(begin, end)));
              for (It i = begin; i != end; ++i)
              {
                  if (multiway_merge_range_begin(*i) !=
                      multiway_merge_range_end(*i))
                  {
                      remaining.push_back(std::make_pair(
                          multiway_merge_range_begin(*i),
                          multiway_merge_range_end(*i)));
                  }
              }
              while (!remaining.empty())
              {
                  typename Remaining::iterator smallest =
                      remaining.begin();
                  for (typename Remaining::iterator
                      i = remaining.begin();
                      i != remaining.end();
                  )
                  {
                      if (less(*i->first, *smallest->first))
                      {
                          smallest = i;
                          ++i;
                      }
                      else if (distinct && i != smallest &&
                          !less(
                              *smallest->first,
                              *i->first))
                      {
                          i = remaining.erase(i);
                      }
                      else { ++i; }
                  }
                  *out = smallest->first;
                  ++out;
                  ++smallest->first;
                  if (smallest->first == smallest->second)
                  { smallest = remaining.erase(smallest); }
              }
              return out;
          }
          else
          {
              std::priority_queue<
                  std::pair<SubIt, SubIt>,
                  std::vector<std::pair<SubIt, SubIt> >,
                  multiway_merge_value_less<Less>
              > q((multiway_merge_value_less<Less>(less)));
              return multiway_merge(begin, end, out, less, q, distinct);
          }
      }
      
      template<class It, class OutIt>
      OutIt multiway_merge(
          It const begin, It const end, OutIt const out,
          bool const distinct = false)
      {
          return multiway_merge(
              begin, end, out,
              multiway_merge_default_less(), distinct);
      }
      

      【讨论】:

        【解决方案7】:
        Here is my implementation using MinHeap...
        
        package merging;
        
        import java.io.BufferedReader;
        import java.io.BufferedWriter;
        import java.io.File;
        import java.io.FileReader;
        import java.io.FileWriter;
        import java.io.IOException;
        import java.io.PrintWriter;
        
        
        public class N_Way_Merge {
        
        int No_of_files=0;
        String[] listString;
        int[] listIndex;
        PrintWriter pw;
        private String fileDir = "D:\\XMLParsing_Files\\Extracted_Data";
        private File[] fileList;
        private BufferedReader[] readers;
        
        public static void main(String[] args) throws IOException {
        
            N_Way_Merge nwm=new N_Way_Merge();
        
            long start= System.currentTimeMillis();
        
            try {
        
                nwm.createFileList();
        
                nwm.createReaders();
                nwm.createMinHeap();
            }
            finally {
                nwm.pw.flush();
                nwm.pw.close();
                for (BufferedReader readers : nwm.readers) {
        
                    readers.close();
        
                }
            }
            long end = System.currentTimeMillis();
            System.out.println("Files merged into a single file.\nTime taken: "+((end-start)/1000)+"secs");
        }
        
        public void createFileList() throws IOException {
            //creates a list of sorted files present in a particular directory
            File folder = new File(fileDir);
            fileList = folder.listFiles();
            No_of_files=fileList.length;
            assign();
            System.out.println("No. of files - "+ No_of_files);
        
        }
        
        public void assign() throws IOException
        {
            listString = new String[No_of_files];
            listIndex = new int[No_of_files];
            pw = new PrintWriter(new BufferedWriter(new FileWriter("D:\\XMLParsing_Files\\Final.txt", true)));
        }
        
        public void createReaders() throws IOException {
            //creates array of BufferedReaders to read the files
            readers = new BufferedReader[No_of_files];
        
            for(int i=0;i<No_of_files;++i)
            {
                readers[i]=new BufferedReader(new FileReader(fileList[i]));
            }
        }
        
        public void createMinHeap() throws IOException {
        
            for(int i=0;i<No_of_files;i++)
            {
                listString[i]=readers[i].readLine();
                listIndex[i]=i;
            }
        
            WriteToFile(listString,listIndex);
        
        }
        
        public void WriteToFile(String[] listString,int[] listIndex) throws IOException{
        
            BuildHeap_forFirstTime(listString, listIndex);
            while(!(listString[0].equals("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz")))
            {
                pw.println(listString[0]);
                listString[0]=readers[listIndex[0]].readLine();
        
                MinHeapify(listString,listIndex,0);
            }
        
        }
        public void BuildHeap_forFirstTime(String[] listString,int[] listIndex){
        
            for(int i=(No_of_files/2)-1;i>=0;--i)
                MinHeapify(listString,listIndex,i);
        
        }
        
        public void MinHeapify(String[] listString,int[] listIndex,int index){
        
            int left=index*2 + 1;
            int right=left + 1;
            int smallest=index;
            int HeapSize=No_of_files;
            if(left <= HeapSize-1  && listString[left]!=null &&  (listString[left].compareTo(listString[index])) < 0)
                smallest = left;
        
            if(right <= HeapSize-1 && listString[right]!=null &&  (listString[right].compareTo(listString[smallest])) < 0)
                smallest=right;
        
        
        
            if(smallest!=index)
            {
                String temp=listString[index];
                listString[index]=listString[smallest];
                listString[smallest]=temp;
        
                listIndex[smallest]^=listIndex[index];
                listIndex[index]^=listIndex[smallest];
                listIndex[smallest]^=listIndex[index];
        
                MinHeapify(listString,listIndex,smallest);
            }
        
        }
        

        }

        【讨论】:

          【解决方案8】:

          用于合并k个排序数组的最小堆算法的Java实现:

          public class MergeKSorted {
          
          /**
           * helper object to store min value of each array in a priority queue, 
           * the kth array and the index into kth array
           *
           */
          static class PQNode implements Comparable<PQNode>{
              int value;
              int kth = 0;
              int indexKth = 0;
          
              public PQNode(int value, int kth, int indexKth) {
                  this.value = value;
                  this.kth = kth;
                  this.indexKth = indexKth;
              }
              @Override
              public int compareTo(PQNode o) {
                  if(o != null) {
                      return Integer.valueOf(value).compareTo(Integer.valueOf(o.value));
                  }
                  else return 0;
              }
          
              @Override
              public String toString() {
                  return value+" "+kth+" "+indexKth;
              }
          }
          public static void mergeKSorted(int[][] sortedArrays) {
              int k = sortedArrays.length;
              int resultCtr = 0;
              int totalSize = 0;
              PriorityQueue<PQNode> pq = new PriorityQueue<>();
              for(int i=0; i<k; i++) {
                  int[] kthArray = sortedArrays[i];
                  totalSize+=kthArray.length;
                  if(kthArray.length > 0) {
                      PQNode temp = new PQNode(kthArray[0], i, 0);
                      pq.add(temp); 
                  }
              }
              int[] result = new int[totalSize];
              while(!pq.isEmpty()) {
                  PQNode temp = pq.poll();
                  int[] kthArray = sortedArrays[temp.kth];
                  result[resultCtr] = temp.value;
                  resultCtr++;            
                  temp.indexKth++;
                  if(temp.indexKth < kthArray.length) {
                      temp = new PQNode(kthArray[temp.indexKth], temp.kth, temp.indexKth);
                      pq.add(temp);
                  }
          
              }
              print(result);
          }
          
          public static void print(int[] a) {
              StringBuilder sb = new StringBuilder();
              for(int v : a) {
                  sb.append(v).append(" ");
              }
              System.out.println(sb);
          }
          
          public static void main(String[] args) {
               int[][] sortedA = {
                   {3,4,6,9},
                   {4,6,8,9,12},
                   {3,4,9},
                   {1,4,9}    
               };
               mergeKSorted(sortedA);
          }
          
          }
          

          【讨论】:

            猜你喜欢
            • 2023-03-24
            • 2011-01-14
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2014-02-09
            • 2015-06-16
            • 2021-12-10
            相关资源
            最近更新 更多