【问题标题】:Need suggestion on In-Memory pull based queue library需要有关基于内存拉取的队列库的建议
【发布时间】:2021-11-15 11:22:23
【问题描述】:

我正在为技术工作做准备,遇到了一个现在经常被问到的面试问题。我在 LeetCode 等网站上找到了一些代码 sn-ps,但大部分都是 Java 代码。我想知道我是否在面试中遇到了这个问题,有没有办法用 C# 设计/编写解决方案(PriorityQueue 之类的东西在这里不可用)?

以下是基本用例。

  1. 图书馆维护的多个队列

  2. 每个队列必须支持多个发布者和订阅者。

  3. 每个队列都有一个最长保留期,超过该保留期后,队列中的消息不应驻留在内存中。

  4. 队列中的每条消息都可以有一个可选的 TTL 值。任何具有过期 TTL 的消息都不应被任何订阅者消费,也不应驻留在内存中。

  5. 每个消费者都应该阅读所有的消息。

之前在这里Multiple Producers/Consumers 已经发布过类似的东西(多个生产者/多个消费者),但在 C# 上没有太多可用的在线内容。

关于如何使用标准 .NET API 来为此设计解决方案的任何建议。

【问题讨论】:

    标签: c# multithreading producer-consumer


    【解决方案1】:

    Reactive Extensions 库 (Rx) 似乎是解决此问题的理想选择。它基于两个接口:IObservable<T> = 发布者和IObserver<T> = 订阅者,并提供这些接口的许多实现和组合器。例如,ReplaySubject<T> 类实现了这两个接口,并且它有一个构造函数,该构造函数接受参数 TimeSpan window,该参数定义了回放缓冲区的最大时间长度。

    您可以在下面看到如何实例化具有 30 秒保留期的 ReplaySubject<T>,如何将项目排入其中,如何订阅将接收主题中当前所有项目通知的消费者,以及未来将排队的所有项目,以及如何结束订阅以停止接收通知:

    using System.Reactive.Subjects;
    //...
    var subject = new ReplaySubject<Item>(TimeSpan.FromSeconds(30));
    //...
    subject.OnNext(new Item());
    //...
    var subscription = subject.Subscribe((Item x) => Console.WriteLine($"Received {x}"));
    //...
    subscription.Dispose();
    

    至于“过期 TTL”功能,这似乎很棘手,因为 AFAIK 在 Rx 中没有提供此功能的内置组件。我考虑将其实现为自定义ISubject&lt;T&gt;,基于嵌套主题的结构:ReplaySubject&lt;ReplaySubject&lt;T&gt;&gt;。内部主题将包含单个值,并且将使用等于特定项目的 TTL 过期时间的window 进行实例化。消费者将在将其与Merge 运算符合并后订阅此结构。这个想法源于这个问题:How can I clear the buffer on a ReplaySubject? 这里是一个实现:

    /// <summary>
    /// Represents an object that is both an observable sequence as well as an observer.
    /// Each notification is broadcasted to all subscribed and future observers, subject
    /// to buffer trimming and notification expiration policies.
    /// </summary>
    public class ExpirableReplaySubject<T> : ISubject<T>
    {
        private readonly TimeSpan _window;
        private readonly Func<T, TimeSpan> _expireAfterSelector;
        private readonly ReplaySubject<ISubject<T>> _replaySubject;
        private readonly IObservable<T> _replaySubjectMerged;
    
        public ExpirableReplaySubject(TimeSpan window,
            Func<T, TimeSpan> expireAfterSelector)
        {
            _window = window;
            _expireAfterSelector = expireAfterSelector;
            _replaySubject = new ReplaySubject<ISubject<T>>(window);
            _replaySubjectMerged = _replaySubject.Merge();
        }
    
        public void OnNext(T value)
        {
            var expireAfter = _expireAfterSelector(value);
            if (expireAfter > _window) expireAfter = _window;
            var inner = new ReplaySubject<T>(1, expireAfter); inner.OnNext(value);
            _replaySubject.OnNext(inner);
        }
    
        public void OnCompleted()
        {
            // All subjects, inner and outer, must be completed
            _replaySubject.OnCompleted();
            _replaySubject.Subscribe(subject => subject.OnCompleted());
        }
    
        public void OnError(Exception error)
        {
            // Faulting the outer subject is enough
            _replaySubject.OnError(error);
        }
    
        public IDisposable Subscribe(IObserver<T> observer)
        {
            return _replaySubjectMerged.Subscribe(observer);
        }
    }
    

    window 参数配置主题的保留时间跨度,expireAfterSelector 是一个选择器,针对收到的每个新项目运行,并返回此特定项目的到期时间。

    这种方法的问题在于,当内部主体从外部_replaySubject 中逐出时,根据window 策略,它们没有完成(它们的OnCompleted 方法没有被调用)。如果不完成内部主题,他们将永远不会取消订阅_replaySubjectMerged,从而导致永久性内存泄漏。所以需要做更多的工作。我们可以实现一个轻量级的ISubject&lt;T&gt;,而不是使用ReplaySubject&lt;T&gt;s 和等于1bufferSize 作为内部主题:

    private class ExpirableValueSubject<T> : ISubject<T>
    {
        private T _value;
        private Timer _timer; // Becomes null when the subject is completed
    
        public ExpirableValueSubject(T value, TimeSpan expireAfter)
        {
            if (expireAfter <= TimeSpan.Zero) return; // Expired upon creation
            _value = value;
            _timer = new Timer(arg => ((ExpirableValueSubject<T>)arg).OnCompleted(),
                this, expireAfter, Timeout.InfiniteTimeSpan);
        }
    
        public void OnNext(T value) => throw new InvalidOperationException();
        public void OnError(Exception error) => throw new InvalidOperationException();
        public void OnCompleted()
        {
            lock (this) { _timer?.Dispose(); _timer = null; _value = default; }
        }
    
        public IDisposable Subscribe(IObserver<T> observer)
        {
            lock (this)
            {
                if (_timer != null) observer.OnNext(_value);
                observer.OnCompleted();
            }
            return Disposable.Empty;
        }
    }
    

    此自定义主题在订阅时立即发出OnCompleted 通知,其OnCompleted 方法已被劫持以释放_value 并处置_timer

    这个类使用this作为一个储物柜对象,is not advisable in general。在这种情况下,该类旨在作为一个内部组件,并且对该类实例的引用不会泄漏到外部世界,因此锁定 this 应该没问题。

    要完成我们的解决方案,您所要做的就是替换这一行:

    var inner = new ReplaySubject<T>(1, expireAfter); inner.OnNext(value);
    

    ...用这个:

    var inner = new ExpirableValueSubject<T>(value, expireAfter);
    

    通过此更改,ExpirableReplaySubject&lt;T&gt; 类应该可以按预期工作,而不会泄漏内存。当然,它是线程安全的,因为所有针对一般用途的 ISubject&lt;T&gt; 实现都应该是。

    使用示例:

    var subject = new ExpirableReplaySubject<Item>(TimeSpan.FromSeconds(30),
        item => item.TTL);
    

    注意:原始实现 (Revision 1) 使用封装的 BehaviorSubject&lt;T&gt; 作为内部主题。这种方法是有问题的,因为BehaviorSubject&lt;T&gt; 仅在处置时才释放其内部值,而不是在完成时。并且在处理它之后变得无法使用(它抛出ObjectDisposedExceptions)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-07-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多