【问题标题】:Convert IDictionary<T, Task<bool>> to IObservable<KeyValuePair<T, bool>>将 IDictionary<T, Task<bool>> 转换为 IObservable<KeyValuePair<T, bool>>
【发布时间】:2016-03-31 15:56:37
【问题描述】:

我有一个功能可以用另一个实体替换存储库(例如数据库)中的单个实体。这将返回一个任务,其结果指示替换是否成功。

// Single Replace
Task<bool> Replace(Entity e, Entity Other);

现在说我想创建相同的功能,但允许替换多个实体。我对函数签名的想法是:

// Multiple Replace
IObservable<KeyValuePair<Entity, bool>> Replace(IDictionary<Entity, Entity> Map);

Map 包含一个字典,其键是要替换的实体,其值是要替换旧实体的新实体。

这个Map字典的每个KeyValuePair都应该返回一个KeyValuePair,其中key对应Map.Key(=要被替换的Entity),Value是一个bool,表示替换成功还是不是。

我想将此作为流返回,因此我选择了 IObservable>,为每个可用的结果推出一个 KeyValuePair。

现在,我只想让“多重替换”功能使用“单一替换”功能进行转换;但是我怎样才能进行这些调用并以所需的格式返回结果呢?

Task<bool> Replace(Entity e, Entity Other)
{
  // ... do the work ...
}

IObservable<KeyValuePair<Entity, bool>> Replace(IDictionary<Entity, Entity> Map)
{
  // How this should work:
  // for each KeyValuePair in Map, call the Single Replace function
  // and - once available - return its result (bool) mapped to the Entity in the
  // resulting Observable stream

  IDictionary<Entity, Task<bool>> dict = Map.ToDictionary(x => x.Key, x => Replace(x.Key, x.Value));

  // .. now what? How can I now convert the dict into an IObservable<KeyValuePair<Entity,bool>> ?

}

不胜感激!

【问题讨论】:

    标签: c# asynchronous system.reactive


    【解决方案1】:

    如果没有一个好的Minimal, Complete, and Verifiable example 清楚地显示你在做什么,尤其是你打算如何实现从你的方法返回的IObservable&lt;T&gt; 对象,就不可能提供具体的建议。不过,我可以提供一些建议,至少可以为您指明一个有用的方向。

    首先,我建议使用Task&lt;Tuple&lt;Entity, bool&gt;&gt;(或Task&lt;KeyValuePair&lt;Entity, bool&gt;&gt;,虽然我不太喜欢将字典相关类型用于非字典相关代码......即你在哪里实际上只是配对事物而不是真正的键/值配对)。这样,当任务完成时,您就可以从任务本身中知道您需要的所有信息。

    这可以通过一个简单的辅助方法来完成,例如:

    async Task<Tuple<Entity, bool>> ReplaceAsync(Entity e, Entity other)
    {
        return Tuple.Create(e, await Replace(e, other));
    }
    

    我在这里使用了Tuple,当然你可以为此目的创建一个命名类型,这样做实际上可能有助于代码的可读性。

    完成此操作后,您可以枚举输入字典(或其他更合适的集合 :) 中的所有项目,然后使用 Task.WhenAny() 迭代地删除并发布每个已完成的项目:

    IObservable<Tuple<Entity, bool>> Replace(IDictionary<Entity, Entity> Map)
    {
        Task<Tuple<Entity, bool>>[] tasks = new Task<Tuple<Entity, bool>>(Map.Count);
        int i = 0;
    
        foreach (var kvp in Map)
        {
            tasks[i++] = ReplaceAsync(kvp.Key, kvp.Value);
        }
    
        // Create the IObservable object somehow. Make sure it has the
        // array of tasks in it. E.g. (see below for TaskObservable example)
    
        TaskObservable<Tuple<Entity, bool>> observable =
            new TaskObservable<Tuple<Entity, bool>>(tasks);
    
        // Now, start running the observable object. Note: not really all that great
        // to ignore the returned Task object but without more context in your question
        // I can't offer anything more specific. You will probably want to store the
        // Task object *somewhere*, await it, wrap all that in try/catch, etc. to
        // make sure you are correctly monitoring the progress of the task.
        var _ = observable.Run();
    
        return observable;
    }
    

    IObservable&lt;T&gt; 可能类似于:

    class TaskObservable<T> : IObservable<T>
    {
        private class ObserverItem : IDisposable
        {
            public readonly IObserver<T> Observer;
            public readonly TaskObservable<T> Owner;
    
            public ObserverItem(IObserver<T> observer, TaskObservable<T> owner)
            {
                Observer = observer;
                Owner = owner;
            }
    
            public void Dispose()
            {
                if (!Owner._observerItems.Contains(this))
                {
                    throw new InvalidOperationException(
                        "This observer is no longer subscribed");
                }
    
                Owner._observerItems.Remove(this);
            }
        }
    
        private readonly Task<T>[] _tasks;
        private readonly List<ObserverItem> _observerItems = new List<ObserverItem>();
    
        public TaskObservable(Task<T>[] tasks)
        {
            _tasks = tasks;
        }
    
        public IDisposable Subscribe(IObserver<T> observer)
        {
            ObserverItem item = new ObserverItem(observer, this);
    
            _observerItems.Add(item);
            return item;
        }
    
        private void Publish(T t)
        {
            foreach (ObserverItem item in _observerItems)
            {
                item.Observer.OnNext(t);
            }
        }
    
        private void Complete()
        {
            foreach (ObserverItem item in _observerItems)
            {
                item.Observer.OnCompleted();
            }
        }
    
        async Task Run()
        {
            for (int i = 0; i < _tasks.Length; i++)
            {
                Task<T> completedTask = await Task.WhenAny(tasks);
    
                Publish(completedTask.Result);
            }
    
            Complete();
        }
    }
    

    【讨论】:

    • 感谢您深思熟虑的回答彼得,这对阅读很有帮助且很有趣-我从您的 TaskObservable 课程中学到了很多东西。而且我认为转换为 Task> 确实是一个关键点——我认为这样我什至可以完全避免像 TaskObservable 这样的类,同时具有方便的错误处理。将在下面发布,让我知道您的想法。
    【解决方案2】:

    基于 Peter 的解决方案,利用 Task.ToObservable 稍作修改:

        IObservable<Tuple<Entity, bool>> Replace(IDictionary<Entity, Entity> Map)
        {
            Task<Tuple<Entity, bool>>[] tasks = new Task<Tuple<Entity, bool>>(Map.Count);
            int i = 0;
    
            foreach (var kvp in Map)
            {
                tasks[i++] = ReplaceAsync(kvp.Key, kvp.Value);
            }
    
            return tasks
                .Select(x => x.ToObservable())
                .Merge();   // or use .Concat for sequential execution
        }
    

    似乎工作正常,有人发现任何潜在问题吗?

    使用 .Merge 会提供一个 IObservable,它可以并行启动任务,在它们完成时提供结果,并在所有任务完成后完成。

    或者我可以使用 .Concat() 顺序执行它们,即首先启动第一个任务,等待它完成,然后启动第二个任务,等等。

    我相信错误处理是免费的,所以每当任何任务抛出异常时,都会自动调用 IObservable 的 OnError。

    【讨论】:

    • 我对响应式扩展不是很熟悉——事实上,我什至不知道 ToObservable()Merge() 方法,它们似乎确实简化了很多场景.我唯一担心的可能是上面创建了一个可观察对象序列,而不是一个序列上的可观察对象;后者可能稍微更有效率。但是恕我直言,代码的清晰性更为重要,因此您需要先证明一个真正重要的性能问题,然后再担心这一点。
    • 无论如何,我的示例必然会遗漏诸如错误处理之类的细节,这只会进一步破坏使用它而不是内置运算符的任何动机。很高兴我的回答的要点将您带到了您需要去的地方。 :)
    【解决方案3】:

    我不确定我是否遗漏了什么,但我认为这可能对你有用:

    public IObservable<KeyValuePair<T, bool>> Convert<T>(IDictionary<T, Task<bool>> source)
    {
        return
            Observable
                .Create<KeyValuePair<T, bool>>(o =>
                    (
                        from s in source.ToObservable()
                        from v in s.Value.ToObservable()
                        select new KeyValuePair<T, bool>(s.Key, v)
                    ).Subscribe(o));
    }
    

    【讨论】:

      猜你喜欢
      • 2013-06-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多