【问题标题】:How to make items in a List self-remove after a given time如何在给定时间后使列表中的项目自删除
【发布时间】:2021-01-22 17:15:09
【问题描述】:

我的班级有一个字段:

IList<Tuple<Object,DateTime>> items = new List<Tuple<Object,DateTime>>();

还有一个类似的 API:

//Add 'o' to the list. At 'expiry' 'o' shoudl be removed from the list and some action perfromed
public void Add(Object o,DateTime expiry, Callback callback)
{
 items.Add(new Tuple<Object,DateTime>(o,DateTime.Now);
 ???
}

在指定的时间删除项目并执行一些操作。我确实想使用轮询循环。

我曾考虑过这样的事情:

public void Add(Object o,DateTime expiry, Callback callback)
{
 items.Add(new Tuple<Object,DateTime>(o,DateTime.Now);
 Task.Delay(expiry - DateTime.Now).ContinueWith(() => {items.Remove(o); callback.notify(o);});
}

显然这不是线程安全的,但拥有这么多任务似乎也是个坏主意。

有什么好的方法可以巧妙地完成这个目标? 添加详细信息:

  • 项目可能会在此期间以其他方式从列表中删除
  • 我可能想要一种方法来清除列表并停止所有这些任务

【问题讨论】:

  • 添加使用前修剪集合的代码。例如,您不希望项目从 foreach 中间的列表中消失,因此请控制它。
  • @LasseV.Karlsen 道歉我错过了一些实体需要知道关于到期的细节。否则你简单的惰性方法会更有意义。
  • 你是在说某种cache with expiration 吗?
  • MemoryCache 是您要找的吗?

标签: c# .net


【解决方案1】:

经过一番思考,我想到了以下想法:

  • 有 1 个任务无限运行,它检查集合并在一定时间后删除过期项目,调用过期的回调
  • 有一个返回未过期项目的字段

我假设您添加到该字段并从中读取多个线程,因此使用 BlockingCollection 进行线程安全添加,可以在 System.Collections.Concurrent 中找到,它实现了消费者生产者模式。

您可以使用仅返回存储中未过期项目的字段。存储会保存这些项目,直到无权运行的任务开始清理。

这是我的例子:

 private static readonly BlockingCollection<(object, DateTime, Callback)> _items = new BlockingCollection<(object, DateTime, Callback)>();
    private static bool _running;
public void Start()
    {
        _running = true;
        Task.Factory.StartNew(async () =>
        {
            while (_running)
            {
                await Task.Delay(1000);
                //block adding until we are finished cleaning up
                //we dont need to block when we invoke expired
                var expired = new List<(object, DateTime, Callback)>();
                lock (_items)
                {
                    var items = _items.ToArray();
                    var now = DateTime.Now;
                    var notExpired = items.Where(item => item.Item2 > now);
                    expired.AddRange(items.Where(item => item.Item2 <= now));
                    while(_items.Count > 0)
                    {
                        _items.Take();
                    }

                    foreach (var item in notExpired)
                    {
                        _items.Add(item);
                    }
                }

                foreach (var item in expired)
                {
                    var (o, _, callback) = item;
                    callback?.notify(o);
                }
            }
        });
    }
public IEnumerable<object> Items
    {
        get
        {
            var now = DateTime.Now;
            foreach (var item in _items.ToArray())
            {
                var (text, expiry, _) = item;
                if(expiry > now)
                    yield return text;
            }
        }
    }
    public void Add(object o, DateTime expiry, Callback callback)
    {
        _items.Add((new Random().Next(int.MaxValue).ToString(), DateTime.Now + TimeSpan.FromSeconds(10)), callback);
    }

【讨论】:

    【解决方案2】:

    几个月前,我们的任务是有点类似的 POC。在我们的例子中,我们需要Memoization,它将每个输入的操作结果缓存一段特定的时间。换句话说,如果您发出相同的操作(使用相同的输入调用相同的方法)并且您处于预定义的时间范围内,那么响应将从缓存中提供,否则它将执行原始请求。

    首先我们介绍了以下帮助类:

    class ValueWithTTL<T>
    {
        public Lazy<T> Result { get; set; }
        public DateTime ExpiresAt { get; set; }
    }
    

    ExpiresAt 代表未来某个时间Result 变得陈旧。

    我们使用了ConcurrentDictionary 来存储缓存的结果。

    这里是Memoizer帮助类的简化版:

    public static class Memoizer
    {
        public static Func<K, V> Memoize<K, V>(this Func<K, V> toBeMemoized, int ttlInMs = 5*1000)
            where K : IComparable
        {
            var memoizedValues = new ConcurrentDictionary<K, ValueWithTTL<V>>();
            var ttl = TimeSpan.FromMilliseconds(ttlInMs);
    
            return (input) => 
            {
                if (memoizedValues.TryGetValue(input, out var valueWithTtl))
                {
                    if (DateTime.UtcNow >= valueWithTtl.ExpiresAt)
                    {
                        memoizedValues.TryRemove(input, out _);
                        valueWithTtl = null;
                        Console.WriteLine($"!!!'{input}' has expired");
                    }
                }
    
                if (valueWithTtl != null)
                    return valueWithTtl.Result.Value;
    
                var toBeCached = new Lazy<V>(() => toBeMemoized(input));
                var toBeExpired = DateTime.UtcNow.AddMilliseconds(ttlInMs);
                var toBeCachedWithTimestamp = new ValueWithTTL<V> { Result = toBeCached, ExpiresAt = toBeExpired};
                memoizedValues.TryAdd(input, toBeCachedWithTimestamp);
                return toBeCachedWithTimestamp.Result.Value;
            };
        }
    }
    
    • 它接收一个Func,只有当给定的input 不存在于memoizedValues 中或它存在但ExpiresAt 比现在小时才会执行。
    • Memoize 返回一个与toBeMemoized 具有相同签名的Func,因此它可以很好地用作装饰器或包装器。

    这里是同步探测:

    private static readonly WebClient wclient = new WebClient();
    private static string[] uris = { "http://google.com", "http://9gag.com", "http://stackoverflow.com", "http://gamepod.hu", "http://google.com", "http://google.com", "http://stackoverflow.com" };
    
    static void SyncProbe(IEnumerable<string> uris)
    {
        var getAndCacheContent = Memoizer.Memoize<string, string>(wclient.DownloadString);
        var rand = new Random();
        foreach (var uri in uris)
        {
            var sleepDuration = rand.Next() % 1500;
            Thread.Sleep(sleepDuration);
            Console.WriteLine($"Slept: {sleepDuration}ms");
    
            var sp = Stopwatch.StartNew();
            _ = getAndCacheContent(uri);
            sp.Stop();
            Console.WriteLine($"'{uri}' request took {sp.ElapsedMilliseconds}ms");
        }
    }
    

    这里是异步探测:

    private static readonly HttpClient client = new HttpClient();
    private static string[] uris = { "http://google.com", "http://9gag.com", "http://stackoverflow.com", "http://gamepod.hu", "http://google.com", "http://google.com", "http://stackoverflow.com" };
    
    static async Task AsyncProbe(IEnumerable<string> uris)
    {
        var getAndCacheContent = Memoizer.Memoize<string, Task<string>>(client.GetStringAsync);
        var downloadTasks = new List<Task>();
        var rand = new Random();
        foreach (var uri in uris)
        {
            var sleepDuration = rand.Next() % 1500;
            await Task.Delay(sleepDuration);
            Console.WriteLine($"Slept: {sleepDuration}ms");
    
            downloadTasks.Add(Task.Run(async () =>
            {
                var sp = Stopwatch.StartNew();
                _ = await getAndCacheContent(uri);
                sp.Stop();
                Console.WriteLine($"'{uri}' request took {sp.ElapsedMilliseconds}ms");
            }));
        }
    
        await Task.WhenAll(downloadTasks.ToArray());
    }
    

    最后是一个示例输出

    Slept: 983ms
    'http://google.com' request took 416ms
    Slept: 965ms
    'http://9gag.com' request took 601ms
    Slept: 442ms
    'http://stackoverflow.com' request took 803ms
    Slept: 1047ms
    'http://gamepod.hu' request took 267ms
    Slept: 844ms
    !!!'http://google.com' has expired
    'http://google.com' request took 201ms
    Slept: 372ms
    'http://google.com' request took 0ms
    Slept: 302ms
    'http://stackoverflow.com' request took 0ms
    
    • 如您所见,我们向 google 发出了 3 个请求,其中 2 个已正常执行,其中 1 个从缓存中提供。无法从缓存中提供第二次尝试,因为它已过时。
    • 我们针对 stackoverflow 发出了 2 个请求,第一个请求正常执行,第二个请求从缓存中提供。

    这只是一个 POC,因此还有很大的改进空间:

    • 使memoizedValues 有界并使用一些驱逐策略
    • 利用WeakReference (1)
    • 尽管known limitation 使用MemoryCache

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-01-14
      • 2020-06-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多