【问题标题】:Observable wrapper for AsyncProducerConsumerQueueAsyncProducerConsumerQueue 的可观察包装器
【发布时间】:2019-08-22 09:36:02
【问题描述】:

所以我使用以下代码为 Stephen Cleary 的AsyncProducerConsumerQueue<T> 创建了一个可观察的包装器。

我想知道这里是否有人知道我如何以更简单的方式做到这一点?

  • 是否可以在没有包装类的情况下编写它?
  • 是否可以防止将多个包装器应用于一个队列的错误?
  • 我可以让它在第一次订阅时连接,而不是直接调用Connect 吗?如果是这样,这意味着什么?
  • 最后,你会怎么做?

using Nito.AsyncEx;
using System.Reactive;

static async Task ExampleUsage() {
    var queue = new AsyncProducerConsumerQueue<int>();
    var observable = queue.AsConnectableObservable();
    await queue.EnqueueAsync(1);
    observable.Subscribe(Console.WriteLine);
    observable.Connect();
    await queue.EnqueueAsync(2);
}

public static class AsyncExExtensions {
    public static IConnectableObservable<T> AsConnectableObservable<T>(this AsyncProducerConsumerQueue<T> queue) {
        return new ConnectableObservableForAsyncProducerConsumerQueue<T>(queue);
    }
}

class ConnectableObservableForAsyncProducerConsumerQueue<T> : IConnectableObservable<T> {

    readonly AsyncProducerConsumerQueue<T> Queue;

    long _isConnected = 0;
    ImmutableList<IObserver<T>> Observers = ImmutableList<IObserver<T>>.Empty;

    public ConnectableObservableForAsyncProducerConsumerQueue(AsyncProducerConsumerQueue<T> queue) {
        Queue = queue;
    }

    public IDisposable Connect() {
        if (Interlocked.Exchange(ref _isConnected, 1) == 1) throw new InvalidOperationException("Observable cannot be connected more than once.");
        var cts = new CancellationTokenSource();
        var token = cts.Token;
        Task.Run(async () => {
            try {
                while (true) {
                    token.ThrowIfCancellationRequested();
                    var @event = await Queue.DequeueAsync(token).ConfigureAwait(false);
                    foreach (var observer in Observers)
                        observer.OnNext(@event);
                }
            } catch (Exception x) when (x is OperationCanceledException || x is InvalidOperationException) {
                foreach (var observer in Observers)
                    observer.OnCompleted();
            }
        });
        return Disposable.Create(() => {
            cts.Cancel();
            cts.Dispose();
        });
    }

    readonly object subscriberListMutex = new object();
    public IDisposable Subscribe(IObserver<T> observer) {
        lock (subscriberListMutex) {
            Observers = Observers.Add(observer);
        }
        return Disposable.Create(() => {
            lock (subscriberListMutex) {
                Observers = Observers.Remove(observer);
            }
        });
    }
}

【问题讨论】:

    标签: c# system.reactive nito.asyncex


    【解决方案1】:

    免责声明:我不是专家,所以我可能忽略了这个答案的某些方面 - 请谨慎使用!

    考虑以下两个演示。在您有多个观察者的情况下,这些行为会有所不同。在第一个演示中,观察者将竞争队列中的项目,在第二个演示中,他们每个人都会收到一份副本。

    演示 #1 - 冷观测

    var queue = new AsyncProducerConsumerQueue<int>();
    
    // This is a cold observable, so each observer is fed by its own individual dequeue loop
    // and therefore will be 'competing' with other observers for queued items.
    var coldObservable = Observable
        // Create an observable that asynchronously waits for items to become available on the
        // queue and then emits them to the observer. This will be cancelled when the observer
        // is unsubscribed. 
        .Create<int>(async (observer, cancellationToken) =>
        {
            while (true)
            {
                var item = await queue.DequeueAsync(cancellationToken).ConfigureAwait(false);
                Console.WriteLine($"Dequeued {item}");
                observer.OnNext(item);
            }
        })
        // If an InvalidOperationException is thrown by the above, continue with
        // an empty observable instead of the error. This effectively catches an
        // `OnError(InvalidOperationException)` and turns it into an `OnCompleted()`.
        .Catch<int, InvalidOperationException>(exn =>
        {
            Console.WriteLine("Caught InvalidOperation");
            return Observable.Empty<int>();
        });
    
    Console.WriteLine("TEST COLD");
    
    await queue.EnqueueAsync(1);
    Console.WriteLine("Enqueued 1");
    
    Console.WriteLine("Subscribing A");
    coldObservable.Subscribe(
        item => Console.WriteLine($"A received {item}"),
        () => Console.WriteLine("A completed"));
    
    Console.WriteLine("Subscribing B");
    coldObservable.Subscribe(
        item => Console.WriteLine($"B received {item}"),
        () => Console.WriteLine("B completed"));
    
    await queue.EnqueueAsync(2);
    Console.WriteLine("Enqueued 2");
    
    await queue.EnqueueAsync(3);
    Console.WriteLine("Enqueued 3");
    
    queue.CompleteAdding();
    Console.WriteLine("Completed adding");
    
    Console.WriteLine("Waiting...");
    await Task.Delay(2000);
    
    Console.WriteLine("DONE");
    
    // TEST COLD
    // Enqueued 1
    // Subscribing A
    // Dequeued 1
    // A received 1
    // Subscribing B
    // Enqueued 2
    // Enqueued 3
    // Completed adding
    // Waiting...
    // Dequeued 2
    // Dequeued 3
    // A received 2
    // B received 3
    // Caught InvalidOperation
    // Caught InvalidOperation
    // A completed
    // B completed
    // DONE
    

    Demo #2 - Hot observable

    var queue = new AsyncProducerConsumerQueue<int>();
    
    var coldObservable = // defined same as above
    
    // This is a hot observable, so each observer receives the same items from the queue.
    var hotObservable = coldObservable
        // Publish the cold observable to create an `IConnectableObservable` that will subscribe
        // to the dequeue loop when connected and emit the same items to all observers.
        .Publish()
        // Automatically connect to the published observable when the first observer subscribes
        // and automatically disconnect when the last observer unsubscribes. This means that the
        // first observer will receive any items queued before it subscribes, but additional
        // observers will only receive items queued after they subscribed.
        .RefCount();
    
    Console.WriteLine("TEST HOT");
    
    await queue.EnqueueAsync(1);
    Console.WriteLine("Enqueued 1");
    
    Console.WriteLine("Subscribing A");
    hotObservable.Subscribe(
        item => Console.WriteLine($"A received {item}"),
        () => Console.WriteLine("A completed"));
    
    Console.WriteLine("Subscribing B");
    hotObservable.Subscribe(
        item => Console.WriteLine($"B received {item}"),
        () => Console.WriteLine("B completed"));
    
    await queue.EnqueueAsync(2);
    Console.WriteLine("Enqueued 2");
    
    await queue.EnqueueAsync(3);
    Console.WriteLine("Enqueued 3");
    
    queue.CompleteAdding();
    Console.WriteLine("Completed adding");
    
    Console.WriteLine("Waiting...");
    await Task.Delay(2000);
    
    Console.WriteLine("DONE");
    
    // TEST HOT
    // Enqueued 1
    // Subscribing A
    // Dequeued 1
    // A received 1
    // Subscribing B
    // Enqueued 2
    // Enqueued 3
    // Dequeued 2
    // Completed adding
    // Waiting...
    // A received 2
    // B received 2
    // Dequeued 3
    // A received 3
    // B received 3
    // Caught InvalidOperation
    // A completed
    // B completed
    // DONE
    

    回答您最初的问题:

    是否可以在没有包装类的情况下编写它?

    是的,请参阅上面的演示。

    是否有可能防止将多个包装器应用于一个队列的错误?

    上面演示的方法不会阻止其他方将项目出列(或对队列执行任何其他操作)。如果您想确保只为给定队列公开一个IObservable&lt;T&gt;,请考虑通过创建一个ObservableProducerConsumerQueue&lt;T&gt; 来封装队列本身,该ObservableProducerConsumerQueue&lt;T&gt; 在内部创建和管理它自己的AsyncProducerConsumerQueue。您可以公开一个仅委托给内部队列的 EnqueueAsync 方法,并使用上面演示的 observable 之一将 observable 作为属性公开或实现 IObservable&lt;T&gt; 接口。

    我可以让它在第一次订阅时连接,而不是通过直接调用 Connect 吗?如果是这样,这意味着什么?

    Demo #2 展示了这种行为并描述了其含义。如果您希望能够在连接之前订阅观察者,请跳过 RefCount 调用并像以前一样使用 Publish 返回的 IConnectableObservable

    最后,你会怎么做呢?

    如上所述,我会使用上面演示的方法之一封装队列并公开IObservableIConnectableObservable

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-11-14
      • 2019-12-22
      • 2017-09-07
      • 2016-12-14
      • 2016-09-12
      • 1970-01-01
      • 2017-10-25
      • 2019-03-15
      相关资源
      最近更新 更多