【问题标题】:.net Rx: in-order batch-processing of messages.net Rx:消息的有序批处理
【发布时间】:2011-02-01 20:38:07
【问题描述】:

我正在尝试使用 Rx 实现异步工作流,但我似乎完全错了。

我想做的是:

From an undefined asynchronous stream of un-parsed message strings (i.e. an IObservable<string>)
parse the message strings asynchronously, but preserve their order. (IObservable<Message>)
Batch up parsed Messages in groups of 100 or so (IObservable<IEnumerable<Message>>)
Send each batch, when complete, to the UI thread to be processed. Batches must arrive in the same order they were started.

我似乎无法获得订单保留,而且 Rx 似乎也没有像我期望的那样异步地做事。

我尝试使用 IEnumerable 而不是 IObservable 来保存订单,然后在其上调用 .AsParallel().AsOrdered() 运算符。这是代码。请参阅下面的注释了解我遇到的问题:

    private IObservable<IEnumerable<Message>> messageSource;
    public IObservable<IEnumerable<Message>> MessageSource { get { return messageSource; } }

    /// <summary>
    /// Sub-classes of MessageProviderBase provide this IEnumerable to 
    /// generate unparsed message strings synchronously
    /// </summary>
    protected abstract IEnumerable<string> UnparsedMessages { get; }

    public MessageProviderBase()
    {
        // individual parsed messages as a PLINQ query
        var parsedMessages = from unparsedMessage in UnparsedMessages.AsParallel().AsOrdered()
                                                 select ParseMessage(unparsedMessage);

        // convert the above PLINQ query to an observable, buffering up to 100 messages at a time
        var batchedMessages
            = parsedMessages.ToObservable().BufferWithTimeOrCount(TimeSpan.FromMilliseconds(200), 100);

        // ISSUE #1:
        // batchedMessages seems to call OnNext before all of the messages in its buffer are parsed.
        // If you convert the IObservable<Message> it generates to an enumerable, it blocks
        // when you try to enumerate it. 

        // Convert each batch to an IEnumerable
        // ISSUE #2: Even if the following Rx query were to run asynchronously (it doesn't now, see the above comment),
        // it could still deliver messages out of order. Only, instead of delivering individual
        // messages out of order, the message batches themselves could arrive out of order.
        messageSource = from messageBatch in batchedMessages
                                        select messageBatch.ToEnumerable().ToList();
    }

【问题讨论】:

    标签: c# .net wpf system.reactive


    【解决方案1】:

    我下面的回答在某种程度上基于 Enigmativity 的代码,但修复了一些与完成相关的竞争条件,还增加了对取消和自定义调度程序的支持(这将使单元测试变得更加容易)。

    public static IObservable<U> Fork<T, U>(this IObservable<T> source,
        Func<T, U> selector)
    {
        return source.Fork<T, U>(selector, Scheduler.TaskPool);
    }
    
    public static IObservable<U> Fork<T, U>(this IObservable<T> source,
        Func<T, U> selector, IScheduler scheduler)
    {
        return Observable.CreateWithDisposable<U>(observer =>
        {
            var runningTasks = new CompositeDisposable();
    
            var lockGate = new object();
            var queue = new Queue<ForkTask<U>>();
            var completing = false;
            var subscription = new MutableDisposable();
    
            Action<Exception> onError = ex =>
            {
                lock(lockGate)
                {
                    queue.Clear();
                    observer.OnError(ex);
                }
            };
    
            Action dequeue = () =>
            {
                lock (lockGate)
                {
                    var error = false;
                    while (queue.Count > 0 && queue.Peek().Completed)
                    {
                        var task = queue.Dequeue();
                        observer.OnNext(task.Value);
                    }
                    if (completing && queue.Count == 0)
                    {
                        observer.OnCompleted();
                    }
                }
            };
    
            Action onCompleted = () =>
            {
                lock (lockGate)
                {
                    completing = true;
                    dequeue();
                }
            };
    
            Action<T> enqueue = t =>
            {
                var cancellation = new MutableDisposable();
                var task = new ForkTask<U>();
    
                lock(lockGate)
                {
                    runningTasks.Add(cancellation);
                    queue.Enqueue(task);
                }
    
                cancellation.Disposable = scheduler.Schedule(() =>
                {
                    try
                    {
                        task.Value = selector(t);
    
                        lock(lockGate)
                        {
                            task.Completed = true;
                            runningTasks.Remove(cancellation);
                            dequeue();
                        }
                    }
                    catch(Exception ex)
                    {
                        onError(ex);
                    }
                });
            };
    
            return new CompositeDisposable(runningTasks, 
                source.AsObservable().Subscribe(
                    t => { enqueue(t); },
                    x => { onError(x); },
                    () => { onCompleted(); }
                ));
        });
    }
    
    private class ForkTask<T>
    {
        public T Value = default(T);
        public bool Completed = false;
    }
    

    这是一个随机化任务执行时间来测试它的示例:

    AutoResetEvent are = new AutoResetEvent(false);
    
    Random rand = new Random();
    
    Observable.Range(0, 5)
        .Fork(i =>
        {
            int delay = rand.Next(50, 500);
            Thread.Sleep(delay);
    
            return i + 1;
        })
        .Subscribe(
            i => Console.WriteLine(i),
            () => are.Set()
        );
    
    are.WaitOne();
    
    Console.ReadLine();
    

    【讨论】:

    • 不幸的是,这个解决方案不起作用。即使在我编译后(Tuple 是不可变的,看起来你有一些 scala),返回的 IObservable 也不会提供任何结果。我相信它在自己的线程上陷入僵局。
    • @Jeremy - 你是对的,编译错误全部结束 - 对此感到抱歉。我已经修复它并将Tuple 依赖项换成了自定义类。但是,我无法重现死锁,因此我包含了一个随机化任务长度的示例用法。我还使用 Scheduler.Immediate 对其进行了测试,以确保它在任务立即完成时工作。
    • 看起来死锁是由于我在工作流程早期误用了 BufferWithTimeOrCount 运算符。
    【解决方案2】:

    假设你有:

    IObservable<string> UnparsedMessages = ...;
    Func<string, Message> ParseMessage = ...;
    

    然后你可以像这样使用SelectAsync 扩展方法:

    IObservable<Message> ParsedMessages = UnparsedMessages.SelectAsync(ParseMessage);
    

    SelectAsync 扩展方法异步处理每个未解析的消息,并确保结果按到达的顺序返回。

    让我知道这是否满足您的需要。

    代码如下:

    public static IObservable<U> SelectAsync<T, U>(this IObservable<T> source,
        Func<T, U> selector)
    {
        var subject = new Subject<U>();
        var queue = new Queue<System.Threading.Tasks.Task<U>>();
        var completing = false;
        var subscription = (IDisposable)null;
    
        Action<Exception> onError = ex =>
        {
            queue.Clear();
            subject.OnError(ex);
            subscription.Dispose();
        };
    
        Action dequeue = () =>
        {
            lock (queue)
            {
                var error = false;
                while (queue.Count > 0 && queue.Peek().IsCompleted)
                {
                    var task = queue.Dequeue();
                    if (task.Exception != null)
                    {
                        error = true;
                        onError(task.Exception);
                        break;
                    }
                    else
                    {
                        subject.OnNext(task.Result);
                    }
                }
                if (!error && completing && queue.Count == 0)
                {
                    subject.OnCompleted();
                    subscription.Dispose();
                }
            }
        };
    
        Action<T> enqueue = t =>
        {
            if (!completing)
            {
                var task = new System.Threading.Tasks.Task<U>(() => selector(t));
                queue.Enqueue(task);
                task.ContinueWith(tu => dequeue());
                task.Start();
            }
        };
    
        subscription = source.Subscribe(
            t => { lock(queue) enqueue(t); },
            x => { lock(queue) onError(x); },
            () => { lock(queue) completing = true; });
    
        return subject.AsObservable();
    }
    

    我最终需要重新审视这个工作并编写了这个代码的更强大的版本(也基于 Richard 的回答。)

    此代码的主要优点是没有任何显式队列。我纯粹是在使用任务延续来将结果按顺序排列。像款待一样工作!

    public static IObservable<U> ForkSelect<T, U>(this IObservable<T> source, Func<T, U> selector)
    {
        return source.ForkSelect<T, U>(t => Task<U>.Factory.StartNew(() => selector(t)));
    }
    
    public static IObservable<U> ForkSelect<T, U>(this IObservable<T> source, Func<T, Task<U>> selector)
    {
        if (source == null) throw new ArgumentNullException("source");
        if (selector == null) throw new ArgumentNullException("selector");
        return Observable.CreateWithDisposable<U>(observer =>
        {
            var gate = new object();
            var onNextTask = Task.Factory.StartNew(() => { });
            var sourceCompleted = false;
            var taskErrored = false;
    
            Action<Exception> onError = ex =>
            {
                sourceCompleted = true;
                onNextTask = onNextTask.ContinueWith(t => observer.OnError(ex));
            };
    
            Action onCompleted = () =>
            {
                sourceCompleted = true;
                onNextTask = onNextTask.ContinueWith(t => observer.OnCompleted());
            };
    
            Action<T> onNext = t =>
            {
                var task = selector(t);
                onNextTask = Task.Factory.ContinueWhenAll(new[] { onNextTask, task }, ts =>
                {
                    if (!taskErrored)
                    {
                        if (task.IsFaulted)
                        {
                            taskErrored = true;
                            observer.OnError(task.Exception);
                        }
                        else
                        {
                            observer.OnNext(task.Result);
                        }
                    }
                });
            };
    
            var subscription = source
                .AsObservable()
                .Subscribe(
                    t => { if (!sourceCompleted) lock (gate) onNext(t); },
                    ex => { if (!sourceCompleted) lock (gate) onError(ex); },
                    () => { if (!sourceCompleted) lock (gate) onCompleted(); });
    
            var @return = new CompositeDisposable(subscription);
    
            return @return;
        });
    }
    

    并且允许使用 LINQ 的 SelectMany 重载是:

    public static IObservable<U> SelectMany<T, U>(this IObservable<T> source, Func<T, Task<U>> selector)
    {
        return source.ForkSelect<T, U>(selector);
    }
    
    public static IObservable<V> SelectMany<T, U, V>(this IObservable<T> source, Func<T, Task<U>> taskSelector, Func<T, U, V> resultSelector)
    {
        if (source == null) throw new ArgumentNullException("source");
        if (taskSelector == null) throw new ArgumentNullException("taskSelector");
        if (resultSelector == null) throw new ArgumentNullException("resultSelector");
        return source.Zip(source.ForkSelect<T, U>(taskSelector), (t, u) => resultSelector(t, u));
    }
    

    所以现在可以像这样使用这些方法:

    var observableOfU = observableOfT.ForkSelect(funcOfT2U);
    

    或者:

    var observableOfU = observableOfT.ForkSelect(funcOfT2TaskOfU);
    

    或者:

    var observableOfU =
        from t in observableOfT
        from u in funcOfT2TaskOfU(t)
        select u;
    

    享受吧!

    【讨论】:

    • 看起来不错!我建议删除主题并将代码包装在 Observable.CreateWithDisposable 中以避免竞争条件。然后你也可以返回subscription,它会自动被处理OnCompleted/OnError,所以你也可以删除该代码。您还可以放弃所有方法顺序保护(例如 onError 后的 onCompleted),转而订阅source.AsObservable(),因为这将为您强制执行顺序。
    • 另外,在为局部变量分配订阅然后从观察者使用该变量时,我建议使用MutableDisposable 来防止可能导致NullReferenceException
    • 最后(抱歉),如果您使用.NET 4,您可以通过Disposable.Create 支持CancellationTokenSource(并使用CompositeDisposable 与源订阅结合)以允许取消其他任务如果有一个错误。
    • 我不确定我是否理解使用主题而不是 Observable.CreateWithDisposable 的竞争条件。你能详细说明一下,或者举个例子吗?谢谢!另外,上述代码中没有使用 ConcurrentQueue 是否有特定原因?这将在 .Net 4 上运行。
    • @Jeremy - 有可能在返回主题之前完成任务,而 CreateWithDisposable 会为您处理。至于 ConcurrentQueue,您从未提到过 .NET 4,所以我猜 Enigmativity 不想在他的示例中要求它。
    猜你喜欢
    • 1970-01-01
    • 2019-12-15
    • 2016-03-30
    • 2016-03-21
    • 1970-01-01
    • 1970-01-01
    • 2011-01-03
    • 1970-01-01
    • 2019-07-02
    相关资源
    最近更新 更多