【问题标题】:How can I combine two streams ordered then grouped by timestamp?如何组合两个按时间戳排序然后分组的流?
【发布时间】:2012-03-17 09:31:40
【问题描述】:

我有两个对象流,每个对象都有一个Timestamp 值。两个流都是按顺序排列的,因此例如时间戳在一个流中可能是 Ta = 1,3,6,6,7 而在另一个流中可能是 Tb = 1,2,5,5,6,8。两个流中的对象属于同一类型。

我想做的是将这些事件中的每一个按时间戳顺序放置在总线上,即放置 A1,然后放置 B1、B2、A3 等。此外,由于某些流具有多个具有相同时间戳的(顺序)元素,因此我希望将这些元素分组,以便每个新事件都是一个数组。所以我们将 [A3] 放在总线上,然后是 [A15,A25] 等等。

我试图通过创建两个ConcurrentQueue 结构来实现这一点,将每个事件放在队列的后面,然后查看队列的每个前面,首先选择较早的事件,然后遍历队列,这样所有存在具有此时间戳的事件。

但是,我遇到了两个问题:

  • 如果我让这些队列不受限制,我很快就会耗尽内存,因为读取操作比接收事件的处理程序快得多。 (我有几 GB 的数据)。
  • 我有时会遇到这样一种情况,即我在 A2515 /sub> 到了。我不知何故需要提防这种情况。

我认为 Rx 可以在这方面提供帮助,但我没有看到明显的组合器可以使这成为可能。因此,非常感谢任何建议。

【问题讨论】:

  • 您有 63 个问题没有被接受的答案。
  • @MattGrande 你是说我问的 ?s 太难了吗?
  • 您看过 TPL 数据流吗? microsoft.com/download/en/…
  • @DmitriNesteruk 他建议你应该接受正确的答案。

标签: c# system.reactive complex-event-processing


【解决方案1】:

Rx 确实非常适合 IMO 这个问题。

IObservables 不能“OrderBy”,原因很明显(您必须首先观察整个流以保证正确的输出顺序),所以我在下面的回答假设(您声明)您的 2 源事件流是有序的。

最后这是一个有趣的问题。标准的 Rx 运算符缺少一个 GroupByUntilChanged ,只要它在观察到下一组的第一个元素时在前一组可观察到的上调用 OnComplete 就可以轻松解决这个问题。然而,查看DistinctUntilChanged 的实现,它并没有遵循这种模式,只在源 observable 完成时调用OnComplete(即使它知道在第一个非不同元素之后不会有更多元素......奇怪吗? ??)。无论如何,出于这些原因,我决定不使用 GroupByUntilChanged 方法(不违反 Rx 约定),而是使用 ToEnumerableUntilChanged

免责声明:这是我的第一个 Rx 扩展,因此非常感谢您对我的选择的反馈。此外,我主要关心的一个问题是持有 distinctElements 列表的匿名 observable。

首先,您的应用程序代码非常简单:

    public class Event
    {
        public DateTime Timestamp { get; set; }
    }

    private IObservable<Event> eventStream1;
    private IObservable<Event> eventStream2; 

    public IObservable<IEnumerable<Event>> CombineAndGroup()
    {
        return eventStream1.CombineLatest(eventStream2, (e1, e2) => e1.Timestamp < e2.Timestamp ? e1 : e2)
            .ToEnumerableUntilChanged(e => e.Timestamp);
    }

现在是 ToEnumerableUntilChanged 实现(代码警告墙):

    public static IObservable<IEnumerable<TSource>> ToEnumerableUntilChanged<TSource,TKey>(this IObservable<TSource> source, Func<TSource,TKey> keySelector)
    {
        // TODO: Follow Rx conventions and create a superset overload that takes the IComparer as a parameter
        var comparer = EqualityComparer<TKey>.Default;

        return Observable.Create<IEnumerable<TSource>>(observer =>
        {
            var currentKey = default(TKey);
            var hasCurrentKey = false;
            var distinctElements = new List<TSource>();

            return source.Subscribe((value =>
            {
                TKey elementKey;
                try
                {
                    elementKey = keySelector(value);
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                    return;
                }

                if (!hasCurrentKey)
                {
                    hasCurrentKey = true;
                    currentKey = elementKey;
                    distinctElements.Add(value);
                    return;
                }

                bool keysMatch;
                try
                {
                    keysMatch = comparer.Equals(currentKey, elementKey);
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                    return;
                }

                if (keysMatch)
                {
                    distinctElements.Add(value);
                    return;
                }

                observer.OnNext( distinctElements);

                distinctElements.Clear();
                distinctElements.Add(value);
                currentKey = elementKey;

            }), observer.OnError, () =>
            {
                if (distinctElements.Count > 0)
                    observer.OnNext(distinctElements);

                observer.OnCompleted();
            });
        });
    }

【讨论】:

  • 一个虚拟的 +1:我通读了代码,看起来它会像宣传的那样执行......非常好! :D
  • 哇,我希望我能不止一次 +1。谢谢!
猜你喜欢
  • 1970-01-01
  • 2013-01-26
  • 1970-01-01
  • 2016-01-15
  • 2015-06-06
  • 1970-01-01
  • 2023-02-13
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多