【问题标题】:ValveSubject: a queuing subject for Rx with built-in buffering, open/close operationsValveSubject:具有内置缓冲、打开/关闭操作的 Rx 队列主题
【发布时间】:2015-02-19 08:57:55
【问题描述】:

我经常遇到需要某种阀门构造来控制反应管道流量的情况。通常,在基于网络的应用程序中,我需要根据连接状态打开/关闭请求流。

这个阀门主体应该支持打开/关闭流,并以先进先出的顺序输出传递。当阀门关闭时,输入值应该被缓冲。

ConcurrentQueueBlockingCollection 通常用于此类场景,但这会立即将线程引入图片。我一直在寻找一个纯粹的被动解决方案来解决这个问题。

【问题讨论】:

  • 以上是解决这个问题的一个非常简洁的方法——3行Rx!
  • 确实很整洁!然而,正如我所提到的,观察者自己关闭了闸门(以调节来自下游的流量 - 作为一种无损失的背压机制的形式运行)。使用您的方法会以无法停止的“突发”释放缓冲元素。这个要求就是我将缓冲阶段的输出反馈到输入的原因。
  • 啊,是的。当然,没有什么可以阻止门流的输入依赖于数据流的输出 - 但我当然没有解决这个细节。

标签: c# system.reactive


【解决方案1】:

这里主要基于Buffer()BehaviorSubject实现。行为主体跟踪阀门的打开/关闭状态。阀门的开口开始缓冲窗口,阀门的关闭关闭这些窗口。缓冲区操作符的输出被“重新注入”到输入中(这样即使观察者自己也可以关闭阀门):

/// <summary>
/// Subject offering Open() and Close() methods, with built-in buffering.
/// Note that closing the valve in the observer is supported.
/// </summary>
/// <remarks>As is the case with other Rx subjects, this class is not thread-safe, in that
/// order of elements in the output is indeterministic in the case of concurrent operation 
/// of Open()/Close()/OnNext()/OnError(). To guarantee strict order of delivery even in the 
/// case of concurrent access, <see cref="ValveSubjectExtensions.Synchronize{T}(NEXThink.Finder.Utils.Rx.IValveSubject{T})"/> can be used.</remarks>
/// <typeparam name="T">Elements type</typeparam>
public class ValveSubject<T> : IValveSubject<T>
{
    private enum Valve
    {
        Open,
        Closed
    }

    private readonly Subject<T> input = new Subject<T>();
    private readonly BehaviorSubject<Valve> valveSubject = new BehaviorSubject<Valve>(Valve.Open);
    private readonly Subject<T> output = new Subject<T>();

    public ValveSubject()
    {
        var valveOperations = valveSubject.DistinctUntilChanged();
        input.Buffer(
            bufferOpenings: valveOperations.Where(v => v == Valve.Closed),
            bufferClosingSelector: _ => valveOperations.Where(v => v == Valve.Open))
            .SelectMany(t => t).Subscribe(input);
        input.Where(t => valveSubject.Value == Valve.Open).Subscribe(output);
    }

    public bool IsOpen
    {
        get { return valveSubject.Value == Valve.Open; }
    }

    public bool IsClosed
    {
        get { return valveSubject.Value == Valve.Closed; }
    }

    public void OnNext(T value)
    {
        input.OnNext(value);
    }

    public void OnError(Exception error)
    {
        input.OnError(error);
    }

    public void OnCompleted()
    {
        output.OnCompleted();
        input.OnCompleted();
        valveSubject.OnCompleted();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return output.Subscribe(observer);
    }

    public void Open()
    {
        valveSubject.OnNext(Valve.Open);
    }

    public void Close()
    {
        valveSubject.OnNext(Valve.Closed);
    }
}

public interface IValveSubject<T>:ISubject<T>
{
    void Open();

    void Close();
}

冲洗阀门的其他方法有时会很有用,例如消除不再相关的剩余请求。这是一个基于先例适配器样式的实现:

/// <summary>
/// Subject with same semantics as <see cref="ValveSubject{T}"/>, but adding flushing out capability 
/// which allows clearing the valve of any remaining elements before closing.
/// </summary>
/// <typeparam name="T">Elements type</typeparam>
public class FlushableValveSubject<T> : IFlushableValveSubject<T>
{
    private readonly BehaviorSubject<ValveSubject<T>> valvesSubject = new BehaviorSubject<ValveSubject<T>>(new ValveSubject<T>());

    private ValveSubject<T> CurrentValve
    {
        get { return valvesSubject.Value; }
    }

    public bool IsOpen
    {
        get { return CurrentValve.IsOpen; }
    }

    public bool IsClosed
    {
        get { return CurrentValve.IsClosed; }
    }

    public void OnNext(T value)
    {
        CurrentValve.OnNext(value);
    }

    public void OnError(Exception error)
    {
        CurrentValve.OnError(error);
    }

    public void OnCompleted()
    {
        CurrentValve.OnCompleted();
        valvesSubject.OnCompleted();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return valvesSubject.Switch().Subscribe(observer);
    }

    public void Open()
    {
        CurrentValve.Open();
    }

    public void Close()
    {
        CurrentValve.Close();
    }

    /// <summary>
    /// Discards remaining elements in the valve and reset the valve into a closed state
    /// </summary>
    /// <returns>Replayable observable with any remaining elements</returns>
    public IObservable<T> FlushAndClose()
    {
        var previousValve = CurrentValve;
        valvesSubject.OnNext(CreateClosedValve());
        var remainingElements = new ReplaySubject<T>();
        previousValve.Subscribe(remainingElements);
        previousValve.Open();
        return remainingElements;
    }

    private static ValveSubject<T> CreateClosedValve()
    {
        var valve = new ValveSubject<T>();
        valve.Close();
        return valve;
    }
}

public interface IFlushableValveSubject<T> : IValveSubject<T>
{
    IObservable<T> FlushAndClose();
}

正如评论中提到的,这些主题不是“线程安全的”,因为在并发操作的情况下不再保证交付顺序。以与标准 Rx SubjectSubject.Synchronize() (https://msdn.microsoft.com/en-us/library/hh211643%28v=vs.103%29.aspx) 类似的方式,我们可以引入一些扩展来提供阀门周围的锁定:

public static class ValveSubjectExtensions
{
    public static IValveSubject<T> Synchronize<T>(this IValveSubject<T> valve)
    {
        return Synchronize(valve, new object());
    }

    public static IValveSubject<T> Synchronize<T>(this IValveSubject<T> valve, object gate)
    {
        return new SynchronizedValveAdapter<T>(valve, gate);
    }

    public static IFlushableValveSubject<T> Synchronize<T>(this IFlushableValveSubject<T> valve)
    {
        return Synchronize(valve, new object());
    }

    public static IFlushableValveSubject<T> Synchronize<T>(this IFlushableValveSubject<T> valve, object gate)
    {
        return new SynchronizedFlushableValveAdapter<T>(valve, gate);
    }
}

internal class SynchronizedValveAdapter<T> : IValveSubject<T>
{
    private readonly object gate;
    private readonly IValveSubject<T> valve;

    public SynchronizedValveAdapter(IValveSubject<T> valve, object gate)
    {
        this.valve = valve;
        this.gate = gate;
    }

    public void OnNext(T value)
    {
        lock (gate)
        {
            valve.OnNext(value);    
        }
    }

    public void OnError(Exception error)
    {
        lock (gate)
        {
            valve.OnError(error);
        }
    }

    public void OnCompleted()
    {
        lock (gate)
        {
            valve.OnCompleted();
        }
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return valve.Subscribe(observer);
    }

    public void Open()
    {
        lock (gate)
        {
            valve.Open();
        }
    }

    public void Close()
    {
        lock (gate)
        {
            valve.Close();
        }
    }
 }

 internal class SynchronizedFlushableValveAdapter<T> : SynchronizedValveAdapter<T>, IFlushableValveSubject<T>
 {
    private readonly object gate;
    private readonly IFlushableValveSubject<T> valve;

    public SynchronizedFlushableValveAdapter(IFlushableValveSubject<T> valve, object gate)
        : base(valve, gate)
    {
        this.valve = valve;
        this.gate = gate;
    }

    public IObservable<T> FlushAndClose()
    {
        lock (gate)
        {
            return valve.FlushAndClose();
        }
    }
} 

【讨论】:

  • 构造函数中的订阅不是泄露了吗?
【解决方案2】:

这是我使用延迟运算符的实现:

source.delay(new Func1<Integer, Observable<Boolean>>() {
    @Override
    public Observable<Boolean> call(Integer integer) {
        return valve.filter(new Func1<Boolean, Boolean>() {
            @Override
            public Boolean call(Boolean aBoolean) {
                return aBoolean;
            }
        });
    }
})
.toBlocking()
.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        System.out.println("out: " + integer);
    }
});

这个想法是延迟所有源排放,直到“阀门打开”。如果阀门已经打开,则不会延迟物品的发射。

Rx valve gist

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-07-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多