【问题标题】:SelectMany with await and ordering of resultsSelectMany 与等待和结果排序
【发布时间】:2017-06-21 13:06:52
【问题描述】:

Deep Dive into Rx SelectMany的文章中,作者在最后的注释中提到了以下内容;

注意:为了缓解订购问题,SelectMany() 带有一个 采用带有签名Func<TSource, int, Task<TResult>> 的选择器的重载。

谁能告诉我这是如何工作的?

【问题讨论】:

标签: c# .net system.reactive reactive-programming


【解决方案1】:

@supertopia 的回答是正确的。

为了好玩,我想我会创建一个扩展方法ToOrdinalOrder 来强制输出顺序,然后可以将其与SelectMany 拼接在一起以获得输出顺序与输入顺序匹配的SelectMany 版本。

public static class Extensions
{
    public static IObservable<T> ToOrdinalOrder<T>(this IObservable<T> source, Func<T, int> indexSelector)
    {
        return source
            .Scan((expectedIndex: 0, heldItems: ImmutableDictionary<int, T>.Empty, toReturn: Observable.Empty<T>()), (state, item) =>
            {
                var itemIndex = indexSelector(item);
                if (itemIndex == state.expectedIndex)
                {
                    var toReturn = Observable.Return(item);
                    var expectedIndex = itemIndex + 1;
                    var heldItems = state.heldItems;
                    while (heldItems.ContainsKey(expectedIndex))
                    {
                        toReturn = toReturn.Concat(Observable.Return(heldItems[expectedIndex]));
                        heldItems = heldItems.Remove(expectedIndex);
                        expectedIndex++;
                    }
                    return (expectedIndex, heldItems, toReturn);
                }
                else
                    return (state.expectedIndex, state.heldItems.Add(itemIndex, item), Observable.Empty<T>());
            })
            .SelectMany(t => t.toReturn);
    }

    public static IObservable<TResult> OrderedSelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector)
    {
        return source
            .SelectMany(async (TSource item, int index) => (await selector(item), index))
            .ToOrdinalOrder(t => t.Item2)
            .Select(t => t.Item1);
    }
}

这里有一些示例代码来演示用法:

var source = new Subject<string>();

source
    .Select((item, index) => (item, index)) //index only required for logging purposes. 
    .OrderedSelectMany(async t =>
    {
        var item = t.Item1;
        var index = t.Item2;
        Console.WriteLine($"Starting item {item}, index {index}.");
        switch (index)
        {
            case 0:
                await Task.Delay(TimeSpan.FromMilliseconds(50));
                Console.WriteLine($"Completing item {item}, index {index}.");
                return (item, index);
            case 1:
                await Task.Delay(TimeSpan.FromMilliseconds(200));
                Console.WriteLine($"Completing item {item}, index {index}.");
                return (item, index);
            case 2:
                await Task.Delay(TimeSpan.FromMilliseconds(20));
                Console.WriteLine($"Completing item {item}, index {index}.");
                return (item, index);
            default:
                throw new NotImplementedException();
        }
    })
    .Subscribe(s => Console.WriteLine($"Received item '{s}'."));

source.OnNext("A");
source.OnNext("B");
source.OnNext("C");
source.OnCompleted();

输出如下所示:

Starting item A, index 0.
Starting item B, index 1.
Starting item C, index 2.
Completing item C, index 2.
Completing item A, index 0.
Received item '(A, 0)'.
Completing item B, index 1.
Received item '(B, 1)'.
Received item '(C, 2)'.

【讨论】:

  • ValueTuples 很棒。难道你不能在OrderedSelectMany 中也写.ToOrdinalOrder(t =&gt; t.index).Select(t =&gt; t.item); 吗?
  • C# 7.0 是我现在使用的。使用它,我必须在上一行中明确命名元组成员。在 7.1 中,您所描述的可能是可能的。
  • @Shlomo 感谢您抽出时间帮我敲出一些代码,非常感谢!
【解决方案2】:

在上述重载的元数据描述中说

//   selector:
//     A transform function to apply to each element; the second parameter of the function
//     represents the index of the source element.

在选择器函数中,您可以从源访问通知的值和原始索引。

例如如果您需要处理一堆值并知道特定源值的工作何时完成。

public static IObservable<int> WorkAndReportIndex<TSource>(this IObservable<TSource> source)
{
    Func<TSource, int, Task<int>> selector = async (value, index) =>
    {
        await SomeWork(value); 
        return index;
    };

    return source.SelectMany(selector);
}

【讨论】:

    猜你喜欢
    • 2020-07-17
    • 1970-01-01
    • 1970-01-01
    • 2013-08-31
    • 2015-10-24
    • 2019-04-19
    • 1970-01-01
    • 2017-10-30
    • 2017-04-26
    相关资源
    最近更新 更多