@supertopia 的回答是正确的。
为了好玩,我想我会创建一个扩展方法ToOrdinalOrder 来强制输出顺序,然后可以将其与SelectMany 拼接在一起以获得输出顺序与输入顺序匹配的SelectMany 版本。
public static class Extensions
{
public static IObservable<T> ToOrdinalOrder<T>(this IObservable<T> source, Func<T, int> indexSelector)
{
return source
.Scan((expectedIndex: 0, heldItems: ImmutableDictionary<int, T>.Empty, toReturn: Observable.Empty<T>()), (state, item) =>
{
var itemIndex = indexSelector(item);
if (itemIndex == state.expectedIndex)
{
var toReturn = Observable.Return(item);
var expectedIndex = itemIndex + 1;
var heldItems = state.heldItems;
while (heldItems.ContainsKey(expectedIndex))
{
toReturn = toReturn.Concat(Observable.Return(heldItems[expectedIndex]));
heldItems = heldItems.Remove(expectedIndex);
expectedIndex++;
}
return (expectedIndex, heldItems, toReturn);
}
else
return (state.expectedIndex, state.heldItems.Add(itemIndex, item), Observable.Empty<T>());
})
.SelectMany(t => t.toReturn);
}
public static IObservable<TResult> OrderedSelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector)
{
return source
.SelectMany(async (TSource item, int index) => (await selector(item), index))
.ToOrdinalOrder(t => t.Item2)
.Select(t => t.Item1);
}
}
这里有一些示例代码来演示用法:
var source = new Subject<string>();
source
.Select((item, index) => (item, index)) //index only required for logging purposes.
.OrderedSelectMany(async t =>
{
var item = t.Item1;
var index = t.Item2;
Console.WriteLine($"Starting item {item}, index {index}.");
switch (index)
{
case 0:
await Task.Delay(TimeSpan.FromMilliseconds(50));
Console.WriteLine($"Completing item {item}, index {index}.");
return (item, index);
case 1:
await Task.Delay(TimeSpan.FromMilliseconds(200));
Console.WriteLine($"Completing item {item}, index {index}.");
return (item, index);
case 2:
await Task.Delay(TimeSpan.FromMilliseconds(20));
Console.WriteLine($"Completing item {item}, index {index}.");
return (item, index);
default:
throw new NotImplementedException();
}
})
.Subscribe(s => Console.WriteLine($"Received item '{s}'."));
source.OnNext("A");
source.OnNext("B");
source.OnNext("C");
source.OnCompleted();
输出如下所示:
Starting item A, index 0.
Starting item B, index 1.
Starting item C, index 2.
Completing item C, index 2.
Completing item A, index 0.
Received item '(A, 0)'.
Completing item B, index 1.
Received item '(B, 1)'.
Received item '(C, 2)'.