如果没有一个好的Minimal, Complete, and Verifiable example 清楚地显示你在做什么,尤其是你打算如何实现从你的方法返回的IObservable<T> 对象,就不可能提供具体的建议。不过,我可以提供一些建议,至少可以为您指明一个有用的方向。
首先,我建议使用Task<Tuple<Entity, bool>>(或Task<KeyValuePair<Entity, bool>>,虽然我不太喜欢将字典相关类型用于非字典相关代码......即你在哪里实际上只是配对事物而不是真正的键/值配对)。这样,当任务完成时,您就可以从任务本身中知道您需要的所有信息。
这可以通过一个简单的辅助方法来完成,例如:
async Task<Tuple<Entity, bool>> ReplaceAsync(Entity e, Entity other)
{
return Tuple.Create(e, await Replace(e, other));
}
我在这里使用了Tuple,当然你可以为此目的创建一个命名类型,这样做实际上可能有助于代码的可读性。
完成此操作后,您可以枚举输入字典(或其他更合适的集合 :) 中的所有项目,然后使用 Task.WhenAny() 迭代地删除并发布每个已完成的项目:
IObservable<Tuple<Entity, bool>> Replace(IDictionary<Entity, Entity> Map)
{
Task<Tuple<Entity, bool>>[] tasks = new Task<Tuple<Entity, bool>>(Map.Count);
int i = 0;
foreach (var kvp in Map)
{
tasks[i++] = ReplaceAsync(kvp.Key, kvp.Value);
}
// Create the IObservable object somehow. Make sure it has the
// array of tasks in it. E.g. (see below for TaskObservable example)
TaskObservable<Tuple<Entity, bool>> observable =
new TaskObservable<Tuple<Entity, bool>>(tasks);
// Now, start running the observable object. Note: not really all that great
// to ignore the returned Task object but without more context in your question
// I can't offer anything more specific. You will probably want to store the
// Task object *somewhere*, await it, wrap all that in try/catch, etc. to
// make sure you are correctly monitoring the progress of the task.
var _ = observable.Run();
return observable;
}
IObservable<T> 可能类似于:
class TaskObservable<T> : IObservable<T>
{
private class ObserverItem : IDisposable
{
public readonly IObserver<T> Observer;
public readonly TaskObservable<T> Owner;
public ObserverItem(IObserver<T> observer, TaskObservable<T> owner)
{
Observer = observer;
Owner = owner;
}
public void Dispose()
{
if (!Owner._observerItems.Contains(this))
{
throw new InvalidOperationException(
"This observer is no longer subscribed");
}
Owner._observerItems.Remove(this);
}
}
private readonly Task<T>[] _tasks;
private readonly List<ObserverItem> _observerItems = new List<ObserverItem>();
public TaskObservable(Task<T>[] tasks)
{
_tasks = tasks;
}
public IDisposable Subscribe(IObserver<T> observer)
{
ObserverItem item = new ObserverItem(observer, this);
_observerItems.Add(item);
return item;
}
private void Publish(T t)
{
foreach (ObserverItem item in _observerItems)
{
item.Observer.OnNext(t);
}
}
private void Complete()
{
foreach (ObserverItem item in _observerItems)
{
item.Observer.OnCompleted();
}
}
async Task Run()
{
for (int i = 0; i < _tasks.Length; i++)
{
Task<T> completedTask = await Task.WhenAny(tasks);
Publish(completedTask.Result);
}
Complete();
}
}