【发布时间】:2021-12-06 16:59:52
【问题描述】:
问题的高级视图
我有 X 个源,其中包含有关我们环境中资产(主机名、IP、MAC、操作系统等)的信息。源包含从 1500 到 150k 的条目(至少是我现在使用的条目)。我的脚本应该查询它们中的每一个,收集这些数据,通过合并来自不同来源的相同资产的信息来消除重复数据,并返回所有条目的统一列表。我当前的实现确实有效,但对于更大的数据集来说速度很慢。我很好奇是否有更好的方法来完成我想做的事情。
普遍问题
通过合并相似条目来删除重复数据,但需要注意的是,合并两个资产可能会改变生成的资产是否与合并前与前两个相似的第三个资产相似。
示例:
~ 相似性,+ 合并
(之前)A~B~C
(之后) (A+B) ~ C 或 (A+B) !~ C
我试着寻找有同样问题的人,我只找到了What is an elegant way to remove duplicate mutable objects in a list in Python?,但它不包括对我来说至关重要的数据合并。
使用的类
为了便于阅读和理解而进行了简化,删除了不需要的部分 - 常规功能完好无损。
class Entry:
def __init__(self, source: List[str], mac: List[str] = [], ip: List[str] = [], hostname: List[str] = [], os: OS = OS.UNKNOWN, details: dict = {}):
# SO: Sorting and sanitization removed for simplicity
self.source = source
self.mac = mac
self.ip = ip
self.hostname = hostname
self.os = os
self.details = details
def __eq__(self, other):
if isinstance(other, Entry):
return (self.source == other.source and
self.os == other.os and
self.hostname == other.hostname and
self.mac == other.mac and
self.ip == other.ip)
return NotImplemented
def is_similar(self, other) -> bool:
def same_entry(l1: list, l2: list) -> bool:
return not set(l1).isdisjoint(l2)
if isinstance(other, Entry):
if self.os == OS.UNKNOWN or other.os == OS.UNKNOWN or self.os == other.os:
empty_hostnames = self.hostname == [] or other.hostname == []
empty_macs = self.mac == [] or other.mac == []
return (same_entry(self.hostname, other.hostname) or
(empty_hostnames and same_entry(self.mac, other.mac)) or
(empty_hostnames and empty_macs and same_entry(self.ip, other.ip)))
return False
def merge(self, other: 'Entry'):
self.source = _merge_lists(self.source, other.source)
self.hostname = _merge_lists(self.hostname, other.hostname)
self.mac = _merge_lists(self.mac, other.mac)
self.ip = _merge_lists(self.ip, other.ip)
self.os = self.os if self.os != OS.UNKNOWN else other.os
self.details = _merge_dicts(self.details, other.details)
def representation(self) -> str:
# Might be useful if anyone wishes to run the code
return f'<Entry from {self.source}: hostname={self.hostname}, MAC={self.mac}, IP={self.ip}, OS={self.os.value}, details={self.details}>'
def _merge_lists(l1: list, l2: list):
return list(set(l1) | set(l2))
def _merge_dicts(d1: dict, d2: dict):
"""
Merge two dicts without overwriting any data.
"""
# If either is empty, return the other one
if not d1:
return d2
if not d2:
return d1
if d1 == d2:
return d1
result = d1
for k, v in d2.items():
if k in result:
result[k + '_'] = v
else:
result[k] = v
return result
class OS(Enum):
'''
Enum specifying the operating system of the asset.
'''
UNKNOWN = 'Unknown'
WINDOWS = 'Windows'
LINUX = 'Linux'
MACOS = 'MacOS'
算法
每个算法都获取来自不同来源的条目列表,例如:
entries = [[entries from source A], [entries from source B], ..., [entries from source Z]]
主要去重功能
它是每个算法中使用的主要函数。它获取来自 2 个不同来源的条目列表,并将其组合到包含资产的列表中,并在需要时合并信息。
这可能是我最需要帮助的部分。这是我能想到的唯一方法。正因为如此,我专注于如何将这个函数的运行速度提高几倍,但是从缩短运行时间的角度来看,让这个函数更快是最好的。
def deduplicate(en1: List[Entry], en2: List[Entry]) -> List[Entry]:
"""
Deduplicates entries from provided lists by merging similar entries.
Entries in the lists are supposed to be already deduplicated.
"""
# If either is empty, return the other one
if not en1:
return en2
if not en2:
return en1
result = []
# Iterate over longer and check for similar in shorter
if len(en2) > len(en1):
en1, en2 = en2, en1
for e in en1:
# walrus operator in Python 3.8 or newer
while (similar := next((y for y in en2 if y.is_similar(e)), None)) is not None:
e.merge(similar)
en2.remove(similar)
del similar
result.append(e)
result.extend(en2)
return result
此处不适用常规重复数据删除(例如使用集合)的原因是因为将一个条目与另一个新条目合并可能会变得相似,例如:
In [2]: e1 = Entry(['SRC_A'], [], ['1.1.1.1'], [], OS.UNKNOWN)
In [3]: e2 = Entry(['SRC_A'], ['aa:bb:cc:dd:ee:ff'], ['1.1.1.1'], [], OS.UNKNOWN)
In [4]: e3 = Entry(['SRC_A'], ['aa:bb:cc:dd:ee:ff'], [], [], OS.UNKNOWN)
In [5]: e1.is_similar(e2)
Out[5]: True
In [6]: e1.is_similar(e3) # at first it's not similar
Out[6]: False
In [7]: e1.merge(e2)
In [8]: e1.is_similar(e3) # but after merging it is
Out[8]: True
第一种方法 - 顺序
我的第一个想法是最简单的,就是简单的递归。
def dedup_multiple(lists: List[List[Entry]]) -> List[Entry]:
"""Deduplication helper allowing for providing more than 2 sources."""
if len(lists) == 1:
return lists[0]
return deduplicate(lists[0], dedup_multiple(lists[1:]))
第二种方法 - 使用池的多线程
这就是我目前使用的方法。到目前为止,它是最快的并且相当简单。
def async_dedup(lists: List[List[Entry]]) -> List[Entry]:
"""Asynchronous deduplication helper allowing for providing more than 2 sources."""
with mp.Pool() as pool:
while len(lists) > 1:
if len(lists) % 2 == 1:
lists.append([])
data = [(lists[i], lists[i+1]) for i in range(0, len(lists), 2)]
lists = pool.map_async(_internal_deduplication, data).get()
return lists[0]
def _internal_deduplication(en):
return deduplicate(*en)
但我很快意识到,如果一项任务比其他任务花费的时间长得多(例如,因为对最大的源进行重复数据删除),那么其他所有任务都会等待而不是工作。
第三种方法 - 使用队列和进程的多线程
当我试图加快第二种方法时,我遇到了How to use python multiprocessing pool in continuous loop 和Filling a queue and managing multiprocessing in python,我想出了以下解决方案。
def async_dedup2(lists: List[List[Entry]]) -> List[Entry]:
tasks_number = min(os.cpu_count(), len(lists) // 2)
args = lists[:tasks_number]
with mp.Manager() as manager:
queue = manager.Queue()
for l in lists[tasks_number:]:
queue.put(l)
processes = []
for arg in args:
proc = mp.Process(target=test, args=(queue, arg, ))
proc.start()
processes.append(proc)
for proc in processes:
proc.join()
return queue.get()
def test(queue: mp.Queue, arg: List[Entry]):
while not queue.empty():
try:
arg2: List[Entry] = queue.get()
except Empty:
continue
arg = deduplicate(arg, arg2)
queue.put(arg)
我认为这将是最好的解决方案,因为如果可能的话,不会有不处理数据的时刻,但经过测试,它几乎总是比第二种方法稍慢。
运行时比较
Source A 1510
Source B 1509
Source C 5000
Source D 4460
Source E 5000
Source F 2084
Deduplicating.....
SYNC - Execution time: 188.6127771000 - Count: 13540
ASYNC - Execution time: 68.249583 - Count: 13532
ASYNC2 - Execution time: 69.416046 - Count: 13532
Source A 1510
Source B 1509
Source C 11821
Source D 13871
Source E 5001
Source F 2333
Deduplicating.....
ASYNC - Execution time: 424.405793 - Count: 26229
ASYNC2 - Execution time: 522.697551 - Count: 26405
【问题讨论】:
-
您是否知道两个相似的条目在合并后会变得不相似?假设A、B、C有相同的mac地址,A没有主机名,B和C有不同的。那么 A 与两者相似。但是将 B 合并到 A 中,现在 A 和 C 不一样了。
-
是的,我知道这可能会发生;但是,我不确定如何正确解决该问题。如果我决定不合并此类案例,我首先需要发现它们(这将需要大量额外的计算来临时合并并检查相似条目的数量是否已更改)并留下更大的相似条目集。如果我想合并它们,我可以保持原样并希望它只是可忽略的子集,或者我需要在合并时实现某种类型的优先级,这也需要前面解释过的某种类型的发现。
标签: python multithreading algorithm merge duplicates