【问题标题】:Efficient algorithm for counting of duplicate elements in an array which doesn't fit into RAM用于计算不适合 RAM 的数组中重复元素的有效算法
【发布时间】:2017-01-15 13:19:13
【问题描述】:

我认为这个主题充分描述了这个问题。我需要计算无法完全加载到计算机内存中的数组中元素的重复次数。阵列可以有 50Gb 甚至更大的大小。在我的某些任务中,该数组的元素是长度不超过 256 个以 UTF-8 编码的字符(总共 512 个字节)的字符串。字符串的数量约为 1 亿。例如,如果我在输入上有以下数组(为简洁起见,字符串被缩短):

VERY NICE ELEMENT_1
VERY NICE ELEMENT_1
VERY NICE ELEMENT_2
VERY NICE ELEMENT_2
NOT SO GOOD ELEMENT
NOT SO GOOD ELEMENT
BAD ELEMENT
BAD ELEMENT
BAD ELEMENT
PERFECT FIFTH ELEMENT

算法应该输出以下内容(可能不完全按照那个顺序):

VERY NICE ELEMENT_1 2
VERY NICE ELEMENT_2 2
NOT SO GOOD ELEMENT 2
BAD ELEMENT 3
PERFECT FIFTH ELEMENT 1

换句话说,我需要做SELECT COUNT(*) GROUP BY 所做的事情。

我猜该算法应该通过将元素分成一些通常适合 RAM 的组来分多个阶段执行计数。然后它应该将这些组减少为一个组。但它怎么能做到这一点呢?它如何有效地合并这些组?

硬盘具有潜在的无限容量。编程语言无关紧要。我只需要知道一个抽象算法。我有过类似任务的经验,我需要对这种数组进行排序。在那里,我做了同样的事情,将所有元素分成多个分区,然后将它们合并到一个文件中。但在这种情况下,我不知道如何做“合并部分”。

提前感谢您的帮助。

【问题讨论】:

  • 为什么不使用数据库?它已经拥有您正在寻找的所有功能,并得到有效实施,
  • 对文件进行排序是一个选项。之后,只需按顺序处理文件并计算相等的项目,它们就会在一起。
  • 数据库。如果您不需要持久性,请禁用事务并写入日志以提高速度。然后你就像你说的那样做SELECT COUNT(*) GROUP BY
  • 您需要准确的结果吗?或者,是否可以接受近似值?
  • @Dobby007:如果近似值没问题,那么我们可以将字符串 散列 成一个数字并将它们放入 unordered_set。最后,unordered_set 的大小将是计数。我假设唯一计数将适合 RAM。

标签: arrays algorithm performance bigdata


【解决方案1】:

伪代码

第一阶段:部分处理+分区

M 为字符串到出现的映射

  1. 虽然没有到达输入文件的末尾
  2. 从文件中读取下一个字符串s
  3. 如果 s 不在 M 中,让 M[s] = 0
  4. M[s] += 1
  5. 如果M的大小超过一定的限制,对M进行key排序,写入一个新的临时文件Fi,然后重置 M
  6. 循环结束
  7. 像第 5 步一样刷新 M

第二阶段:合并

输入:文件 {Fi} 每行格式为“键值”(按键排序)

P成为三元组{K V F}的优先队列,其中排序仅由三元组的第一个元素执行,即K p>

  1. 对于来自 {Fi} 的每个文件 f,请读取来自的第一行“key valuef 并将 {key value f } 推入 P
  2. 令 current_key='', current_value=0
  3. 虽然 P 不为空
  4. Pop {key1 value1 f } 来自 P
  5. 如果还没有到达 f 的结尾,请从 f 中读取下一行“key2 value2”并推送 {key2 value2f} 回到 P,否则关闭并删除 f
  6. 如果current_key == key1那么current_value += value1,否则输出current_key current_value, 并设置current_key=key1, current_value=value1
  7. 循环结束
  8. 输出current_key current_value

Python 实现

#!/usr/bin/env python3

import sys
import heapq
import os
import tempfile

class Partitions(list):
    def __init__(self, stream, max_lines_per_partition, tmpdir):
        self.tmpdir=tmpdir
        m = {}
        for line in stream:
            line = line.rstrip('\n')
            if not line in m:
                if len(m) == max_lines_per_partition:
                    self.save(m)
                    m.clear()
                m[line] = 0
            m[line] += 1

        if len(m) > 0:
            self.save(m)
        del m

    def save(self, m):
        i = len(self)
        new_partition_fname = '{}/part{}'.format(self.tmpdir, i)
        self.append(new_partition_fname)
        f = open(new_partition_fname, 'w')
        for key in sorted(m.keys()):
            f.write('{} {}\n'.format(key, m[key]))
        f.close()


class PartitionEntryIterator:
    def __init__(self, fname):
        self.fname = fname
        self.f = open(fname, 'r')
        self.next()

    def next(self):
        line = self.f.readline()
        if len(line) != 0:
            self.key, self.count = line.rsplit(maxsplit=1)
            self.count = int(self.count.rstrip('\n'))
            return True
        else:
            return False

    def __del__(self):
        self.f.close()

    def __lt__(self, other):
        return self.key < other.key

def count_distinct_lines(lines, max_lines_per_partition):
    with tempfile.TemporaryDirectory() as tmpdir:
        h = []
        for fname in Partitions(lines, max_lines_per_partition, tmpdir):
            x = PartitionEntryIterator(fname)
            heapq.heappush(h, x)

        key = h[0].key
        count = 0
        while not len(h) == 0:
            x = heapq.heappop(h)
            if key == x.key:
                count += x.count
            else:
                yield (key, count)
                key, count = x.key, x.count
            if x.next():
                heapq.heappush(h, x)

        yield (key, count)

if __name__ == '__main__':

    if len(sys.argv) != 2:
        print('Usage:\n\t' + sys.argv[0] + ' <max-lines-per-partition>')
        exit(1)

    for key, count in count_distinct_lines(sys.stdin, int(sys.argv[1])):
        print(key, count, sep=': ')

【讨论】:

  • 我认为你的想法是目前最好的。您不需要对整个文件进行排序,然后再进行处理。您只需要根据原始数据接收到的排序分区。
【解决方案2】:

您可以对文件进行排序,然后按顺序对其进行处理并计算相等的项目,这些项目将放在一起。然后,只要一项与前一项不同,您就可以即时输出结果记录。

【讨论】:

    【解决方案3】:

    我会根据每行的哈希码将文件拆分为多个文件。从 1x 50GB 文件制作 1000x 50MB 文件。然后分别处理每个文件,它会毫无问题地放入内存。

    protected static string[] Partition(string inputFileName, string outPath, int partitions)
    {
        string[] fileNames = Enumerable.Range(0, partitions)
            .Select(i => Path.Combine(outPath, "part" + i))
            .ToArray();
    
        StreamWriter[] writers = fileNames
            .Select(fn => new StreamWriter(fn))
            .ToArray();
    
        StreamReader file = new StreamReader(inputFileName);
        string line;
        while ((line = file.ReadLine()) != null)
        {
            int partition = Math.Abs(line.GetHashCode() % partitions);
            writers[partition].WriteLine(line);
        }
        file.Close();
    
        writers.AsParallel().ForAll(c => c.Close());
    
        return fileNames;
    }
    
    protected static void CountFile(string inputFileName, StreamWriter writer)
    {
        Dictionary<string, int> dict = new Dictionary<string, int>();
    
        StreamReader file = new StreamReader(inputFileName);
        string line;
        while ((line = file.ReadLine()) != null)
        {
            int count;
            if (dict.TryGetValue(line, out count))
            {
                dict[line] = count + 1;
            }
            else
            {
                dict.Add(line, 1);
            }
        }
        file.Close();
    
        foreach (var kv in dict)
        {
            writer.WriteLine(kv.Key + ": " + kv.Value);
        }
    }
    
    protected static void CountFiles(string[] fileNames, string outFile)
    {
        StreamWriter writer = new StreamWriter(outFile);
        foreach (var fileName in fileNames)
        {
            CountFile(fileName, writer);
        }
        writer.Close();
    }
    
    static void Main(string[] args)
    {
        var fileNames = Partition("./data/random2g.txt", "./data/out", 211);           
        CountFiles(fileNames, "./data/random2g.out");
    }
    

    基准测试

    我决定尝试比较排序方法(Leon)和散列。如果你真的不需要它,排序是一项相当多的工作。我制作了包含 20 亿个数字的文件。分布(long)Math.Exp(rnd.NextDouble() * 30) 以相同的概率生成所有长度的数字(最多 14 个)。这种分布会产生许多独特的值,但同时也会产生重复多次的值。甚至字符的概率各不相同。这对人工数据来说还不错。

    File size: 16,8GiB
    Number of lines: 2G (=2000000000)
    Number of distinct lines: 576M
    Line occurences: 1..46M, average: 3,5
    Line length: 1..14, average: 7
    Used characters: '0', '1',...,'9'
    Character frequency: 8,8%..13%, average: 10%
    Disc: SSD
    

    排序结果

    10M lines in partition
    10M distinct lines in partition
    114 partitions
    Partition size: 131MiB
    Sum of partitions size: 14,6GiB
    Partitioning time: 105min
    Merging time: 180min
    Total time: 285min (=4hod 45min)
    

    这种方法可以节省空间,因为分区包含部分合并的数据。

    散列结果

    7M..54M lines in partition, average: 9,5M
    2723766..2732318 distinct lines in partition, average: 2,73M
    211 partitions
    Partition size 73MiB..207MiB, average: 81MiB
    Sum of partitions size: 16,8GiB
    Partitioning time: 6min
    Merging time: 15min
    Total time: 21min
    

    虽然每个分区的大小不同,但所有分区中不同行的数量几乎相同。这意味着哈希函数按预期工作。并且处理每个分区所需的内存是相同的。但确实不能保证,因此如果需要高可靠性,则必须为这些情况添加一些后备策略(将文件重新散列到更小,切换到对该文件进行排序等)。很有可能,它永远不会被真正使用,所以从性能的角度来看,这不是问题。

    散列比按因子排序超过 10,另一方面,其中一些可能源于 python 本身的低效。

    【讨论】:

    • 您讲述了如何将这个大文件分成小文件。但是您将如何处理这些小文件并接收最终结果(即每个唯一行都有重复的文件)?
    • @Dobby007 虽然一个字符串的所有精度都必须在同一个文件中结束,但这是微不足道的。但为了完整起见,我会添加它。
    • 现在我理解你了。但是,如果所有字符串(或大多数字符串)会因为相同的余数同时保持不同而落入一个桶中怎么办?
    • @Dobby007 所有散列算法都依赖于这样一个事实,即散列函数相当不错。如果大多数不同的字符串都在同一个桶中结束,如果你想使用这种方法,你需要更好的哈希函数。
    • 我同意你的看法。良好的散列函数为我们提供了针对不同输入字符串的均匀分布的散列。散列在整数范围(0 - 2 147 ... ...)内是好的,但是当我们计算余数(即散列 % 100)时,我们会减小其散列分布强度范围。我们的字符串将落入 100 个文件之一,在这种情况下,许多不同的字符串写入同一个文件的几率更高。
    【解决方案4】:

    浏览一个文件并按第一个字母将行索引放入文件中。

    一个 3,45,23... b 112,34,546...

    然后您可以并行处理这些,因为您只需要在每个文件中检查每个文件。

    至少这是我的第一个想法。

    如果单词大多是随机的并且并非所有内容都以相同的字母开头,或者最坏的情况是每个单词都相同,那么显然这是最好的。

    【讨论】:

    • 您能更好地解释一下您的想法吗?我需要对创建的索引文件(每个第一个字母及其对应的行号的文件)做什么?
    • 您只需要比较每个文件中的每个单词。这更容易,因为这意味着您不需要将每个单词与所有其他单词进行比较。这在理论上也可以并行运行,因为您可以并行读取文件。所以你有了行号,现在你可以从大文件中读取这些行并进行比较。如果你的效率很高,那么内存中的文件数量只有 2 倍(如果是英文,那就是 2*26),即一次 46 个单词。
    【解决方案5】:

    使用 hadoop 框架并对您的输入字符串执行 map reduce。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-10-22
      • 2012-05-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-04-21
      • 2020-12-01
      • 1970-01-01
      相关资源
      最近更新 更多