【发布时间】:2021-01-20 12:37:51
【问题描述】:
假设我们有一个每分钟输入一次的数据流,并且我们希望在前 10 分钟内跟踪前 5 个值。直觉上应该有一些队列解决方案,但我正在努力寻找一种优雅的方式,因为一个元素可以出于两种不同的原因弹出(它看起来在 11 分钟前或
任何建议将不胜感激!
【问题讨论】:
-
将您的数据放入 Pandas DataFrame 中,然后访问前 10 分钟内的 5 个最大值。
假设我们有一个每分钟输入一次的数据流,并且我们希望在前 10 分钟内跟踪前 5 个值。直觉上应该有一些队列解决方案,但我正在努力寻找一种优雅的方式,因为一个元素可以出于两种不同的原因弹出(它看起来在 11 分钟前或
任何建议将不胜感激!
【问题讨论】:
您不会想要实现队列,因为一旦您需要从顶部取出元素(如小于元素的情况),队列的便利性就会丢失。这是一个自定义类实现,可以满足您的要求——诀窍是在添加值之前检查列表的值。这很简单,但我希望它能给你一个想法:
from datetime import datetime, timedelta
class TopLatestN():
def __init__(self, max_size: int=5, timeframe_m: int=10):
self.__values = []
self.times = [] # Matching queue of times items are added
self.max_size = max_size
self.timeframe_m = timeframe_m
def add_value(self, value):
# Remove values that are too old
self.__check_times()
if len(self.__values) < self.max_size:
# Add the element right away if we aren't 'at capacity'
self.__values.append(value)
self.times.append(datetime.now())
else:
# Add the value only if it's large enough
new_index = self.__check_values()
if new_index is not None:
self.__values[new_index] = value
self.times[new_index] = datetime.now()
def get_values(self):
self.__check_times()
return self.__values
def __check_times(self):
# Remove values/times that are too old
current_time = datetime.now()
# Get matching list of values to keep
keep = []
for time in self.times:
keep.append(time + timedelta(minutes=self.timeframe_m) >= current_time)
# Replace values for all too-told times
self.__values = [val for i, val in enumerate(self.__values) if keep[i]]
self.times = [time for i, time in enumerate(self.times) if keep[i]]
def __check_values(self, value):
# Get index to store value at IF it is large enough - else return None
if any(value > val for val in self.__values):
return self.__values.index(min(self.__values)) # Replace smallest value
return None
【讨论】:
在这里不要忽略一个简单的事情 - 您可能会惊讶于它的效果。您有两个订单,因此请按排序顺序维护两个序列,bytime 按时间排序,byvalue 按值排序。在每个中存储 2 元组 (value, timestamp) 对。当然,您需要使它们保持同步。
因为byvalue 始终按值排序,所以您可以随时查看顶部n、底部n、中间n,或任何其他您想要的顺序统计.
假设您的时间戳(无论对您意味着什么)只会随着时间增加,按时间“排序”是微不足道的:使用 collections.deque 并在一端(例如,右侧)推送新记录并从另一端丢弃结尾。对byvalue 使用普通列表。要使旧记录过期,则:
oldest_to_retain = whatever form of timestamp you use
while bytime and bytime[0][1] < oldest_to_retain:
t = bytime.popleft() # discard expired record
# and remove it from the other seq too
i = bisect.bisect_left(byvalue, t)
assert byvalue[i] == t
del byvalue[i]
要插入传入的值,
t = (the_new_value, current_timestamp)
assert not bytime or bytime[-1][1] <= current_timestamp
bytime.append(t)
bisect.insort(byvalue, t)
现在有了一些经验,人们对这个想法犹豫不决,因为这些语句在 len(byvalue) 中具有 O() 行为线性:
del byvalue[i]
bisect.insort(byvalue, t)
(并且其他语句具有O(1) 或O(log(N)) 行为。)
有了更多的经验,他们就克服了 ;-) 这些以“C 速度”发生,除非 byvalue 增长到数百个元素,否则它通常比花哨的树结构更快 - 并且更节省空间 - 即使它们是用优化的 C 编码的。
如果byvalue 确实变大了,那么将byvalue 从广泛使用的sortedcontainers package 切换为使用SortedList 很容易。那么没有比O(log(N)) 更糟糕的陈述了。您的部分代码仍然简单、灵活且易于推理。
【讨论】: