【发布时间】:2011-02-01 20:38:07
【问题描述】:
我正在尝试使用 Rx 实现异步工作流,但我似乎完全错了。
我想做的是:
From an undefined asynchronous stream of un-parsed message strings (i.e. an IObservable<string>)
parse the message strings asynchronously, but preserve their order. (IObservable<Message>)
Batch up parsed Messages in groups of 100 or so (IObservable<IEnumerable<Message>>)
Send each batch, when complete, to the UI thread to be processed. Batches must arrive in the same order they were started.
我似乎无法获得订单保留,而且 Rx 似乎也没有像我期望的那样异步地做事。
我尝试使用 IEnumerable 而不是 IObservable 来保存订单,然后在其上调用 .AsParallel().AsOrdered() 运算符。这是代码。请参阅下面的注释了解我遇到的问题:
private IObservable<IEnumerable<Message>> messageSource;
public IObservable<IEnumerable<Message>> MessageSource { get { return messageSource; } }
/// <summary>
/// Sub-classes of MessageProviderBase provide this IEnumerable to
/// generate unparsed message strings synchronously
/// </summary>
protected abstract IEnumerable<string> UnparsedMessages { get; }
public MessageProviderBase()
{
// individual parsed messages as a PLINQ query
var parsedMessages = from unparsedMessage in UnparsedMessages.AsParallel().AsOrdered()
select ParseMessage(unparsedMessage);
// convert the above PLINQ query to an observable, buffering up to 100 messages at a time
var batchedMessages
= parsedMessages.ToObservable().BufferWithTimeOrCount(TimeSpan.FromMilliseconds(200), 100);
// ISSUE #1:
// batchedMessages seems to call OnNext before all of the messages in its buffer are parsed.
// If you convert the IObservable<Message> it generates to an enumerable, it blocks
// when you try to enumerate it.
// Convert each batch to an IEnumerable
// ISSUE #2: Even if the following Rx query were to run asynchronously (it doesn't now, see the above comment),
// it could still deliver messages out of order. Only, instead of delivering individual
// messages out of order, the message batches themselves could arrive out of order.
messageSource = from messageBatch in batchedMessages
select messageBatch.ToEnumerable().ToList();
}
【问题讨论】:
标签: c# .net wpf system.reactive