Rx 确实非常适合 IMO 这个问题。
IObservables 不能“OrderBy”,原因很明显(您必须首先观察整个流以保证正确的输出顺序),所以我在下面的回答假设(您声明)您的 2 源事件流是有序的。
最后这是一个有趣的问题。标准的 Rx 运算符缺少一个 GroupByUntilChanged ,只要它在观察到下一组的第一个元素时在前一组可观察到的上调用 OnComplete 就可以轻松解决这个问题。然而,查看DistinctUntilChanged 的实现,它并没有遵循这种模式,只在源 observable 完成时调用OnComplete(即使它知道在第一个非不同元素之后不会有更多元素......奇怪吗? ??)。无论如何,出于这些原因,我决定不使用 GroupByUntilChanged 方法(不违反 Rx 约定),而是使用 ToEnumerableUntilChanged。
免责声明:这是我的第一个 Rx 扩展,因此非常感谢您对我的选择的反馈。此外,我主要关心的一个问题是持有 distinctElements 列表的匿名 observable。
首先,您的应用程序代码非常简单:
public class Event
{
public DateTime Timestamp { get; set; }
}
private IObservable<Event> eventStream1;
private IObservable<Event> eventStream2;
public IObservable<IEnumerable<Event>> CombineAndGroup()
{
return eventStream1.CombineLatest(eventStream2, (e1, e2) => e1.Timestamp < e2.Timestamp ? e1 : e2)
.ToEnumerableUntilChanged(e => e.Timestamp);
}
现在是 ToEnumerableUntilChanged 实现(代码警告墙):
public static IObservable<IEnumerable<TSource>> ToEnumerableUntilChanged<TSource,TKey>(this IObservable<TSource> source, Func<TSource,TKey> keySelector)
{
// TODO: Follow Rx conventions and create a superset overload that takes the IComparer as a parameter
var comparer = EqualityComparer<TKey>.Default;
return Observable.Create<IEnumerable<TSource>>(observer =>
{
var currentKey = default(TKey);
var hasCurrentKey = false;
var distinctElements = new List<TSource>();
return source.Subscribe((value =>
{
TKey elementKey;
try
{
elementKey = keySelector(value);
}
catch (Exception ex)
{
observer.OnError(ex);
return;
}
if (!hasCurrentKey)
{
hasCurrentKey = true;
currentKey = elementKey;
distinctElements.Add(value);
return;
}
bool keysMatch;
try
{
keysMatch = comparer.Equals(currentKey, elementKey);
}
catch (Exception ex)
{
observer.OnError(ex);
return;
}
if (keysMatch)
{
distinctElements.Add(value);
return;
}
observer.OnNext( distinctElements);
distinctElements.Clear();
distinctElements.Add(value);
currentKey = elementKey;
}), observer.OnError, () =>
{
if (distinctElements.Count > 0)
observer.OnNext(distinctElements);
observer.OnCompleted();
});
});
}