【问题标题】:How to generate IObservable index delta on the go?如何在旅途中生成 IObservable 索引增量?
【发布时间】:2012-11-20 12:25:46
【问题描述】:

我有两个数据源。
其中一个是缓存列表,另一个是通过IObservable<T>推送的新数据。

我想使用 Rx 找出需要对缓存列表 A 执行哪些操作,以使其顺序和内容与新数据相同。

我正在寻找一个函数,它接受 IEnumerable<T> aIObservable<T> b 并返回一个可观察对象,a 上推送操作(插入和删除),这将使其与 b 相同,而无需等待b 完成。

注意:我知道我不能修改列表或 observable。我不想。

我只想知道什么操作,以什么顺序,只要这些操作变得已知。

ab 都是唯一且已排序的,T 实现了 IComparable<T>IEquatable<T>

public static IObservable<Tuple<int, bool>> IndexDelta<T>(
    IEnumerable<T> a,
    IObservable<T> b
) where T : IEquatable<T>, IComparable<T> {
    // ???
}

我将在我的示例中使用ints。

什么?!

考虑这两个序列:

A: [150, 100, 70, 30, 20]
B: [300, 200, 100, 70, 60, 50, 20]

目标是找到一系列将A转换为B的删除/插入操作。认为A是缓存数据源,B是新数据,我想知道如何将这些更新转换为网格无需重新加载。

行在两个来源中排序。

我希望输出在表单中

[(0, true), (1, true), (0, false), (3, false), (4, true), (5, true)]

我稍后会按布尔标志对这些操作进行分组:

deleted:  [0, 3]
inserted: [0, 1, 4, 5]

这将翻译成人类语言

  1. 删除A0和A3

    A = [150, 100, 70, 30, 20] = [100, 70, 20]

  2. 将B0、B1、B4、B5插入A:

    A = [300, 200, 100, 70, 60, 50, 20]

  3. 现在 A 与 B 相同。

要求

我要注意几件重要的事情:

  1. A 是保证不会更改的列表。 B 是一个冷的 observable,它需要一些时间才能完成,但很快就会产生第一个项目。因此,结果 observable需要在有足够数据可用时立即推送。

  2. 项目在两个来源中保证是唯一的IEquatable&lt;T&gt;

  3. 项目是不可变的,保证在两个来源中都使用IComparable&lt;T&gt; 降序排序

  4. 最好优化添加到 B 左侧的新项目。这是最常见的情况。然而,考虑到它们的时间戳是适当的(不会破坏排序),项目可能会被删除或插入到任何其他位置。想想 iPhone 相机胶卷。

  5. (*) 如果可能的话,我对纯函数解决方案感兴趣。

伪代码草图

我草拟了一个伪代码算法,它以命令式的方式实现这一点。

我编造了CurrentMoveNextawaityield push 语义,但这个想法应该有点道理。

IObservable<Tuple<int, bool>> IndexDelta(a, b)
{
    var indexA = 0;
    var indexB = 0;

    while (true) {
        var headA = a.Current;
        var headB = b.Current; 

        if (headA == null && headB == null) {
            return yield break; // both sequences are over
        }

        var reportDeletion = () => {
            yield push Tuple.Create(indexA, false);
            await a.MoveNext(); // this one is fast
        }

        var reportInsertion = () => {
            yield push Tuple.Create(indexB, true);
            await b.MoveNext(); // can take a long time
        }

        if (headA == null) { // No source item at this position
            reportInsertion();
            continue;
        }

        if (headB == null) { // No fetched item at this position
            reportDeletion();
            continue;
        }

        switch (headB.CompareTo(headA)) {
            case 0:
                yield continue;
                break;
            case 1: // Fetched item is newer than source item
                reportInsertion();
                break; 
            case -1: // Source item is newer than fetched item
                reportDeletion();
                break; 
        }

        indexA++;
        indexB++;
    }
} 

我相信你可以用Subject&lt;T&gt; 实现一些非常相似的东西。但是我不想继续这个解决方案,因为我想知道是否可以纯粹通过组合 Rx 函数来解决它,例如 AggregateZipCombineLatest

你有什么想法?

【问题讨论】:

  • 这不太合理。您谈论 IEnumerable 或 IObservable 上的插入和删除,但这些不是列表。您只能从中获得下一个价值。您不能从中插入或删除。您的伪代码草图也完全错误,因为 IObservable 不能从具有 yield 的函数创建。只有 IEnumerables 可以使用 yield 神奇地生成。你了解 IEnumerable 和 IOBservable 之间的区别以及它们的用途吗?
  • @brad:我的伪代码是 pseudo 是有原因的。是的,我知道我不能这样做,我只是概述了可以通过推送到 Subject 来实现的算法,如果可以使用纯 Rx,我不想使用它。
  • 但是你还是讲到IEnumerable的插入和删除。正如我所说,它不是一个列表。插入或删除到 IEnumerable 是什么意思。你是说列表吗?
  • 我可能还不够清楚:我不想转换我的输入。我不想更改ab。我想获得ab 在插入和删除方面的差异列表。
  • 你想要两个列表上的差异算法吗?

标签: c# linq system.reactive set-difference


【解决方案1】:

似乎工作...:

void Main()
{
    var a = new int?[] {150, 100, 70, 30, 20 };
    var b = new int?[] {300, 200, 100, 70, 60, 50, 20 };
    var result = IndexDelta(a, b);
    result.Dump();
}

// Define other methods and classes here
IObservable<Tuple<int, bool>> IndexDelta(IEnumerable<int?> a, IEnumerable<int?> b)
{
    var observable = Observable.Create<Tuple<int, bool>>(o => {
        var indexA = 0;
        var indexB = 0;
        var aEnumerator = a.GetEnumerator();
        var bEnumerator = b.GetEnumerator();
        var aHasNext = aEnumerator.MoveNext();
        var bHasNext = bEnumerator.MoveNext();

        while(true) {
            if (aHasNext == false && bHasNext == false) {
                "Completed".Dump();
                o.OnCompleted(); // both sequences are over
                break;
            }

            var headA = aEnumerator.Current;
            var headB = bEnumerator.Current; 

            headA.Dump("A");
            headB.Dump("B");

            Action reportDeletion = () => {
                o.OnNext(Tuple.Create(indexA, false));
                aHasNext = aEnumerator.MoveNext(); // this one is fast
            };
            Action reportInsertion = () => {
                o.OnNext(Tuple.Create(indexB, true));
                bHasNext = bEnumerator.MoveNext(); // can take a long time
            };

            if (headA == null) { // No source item at this position
                reportInsertion();
                continue;
            }

            if (headB == null) { // No fetched item at this position
                reportDeletion();
                continue;
            }   

            switch (headB.Value.CompareTo(headA.Value)) {
                case 0:     
                    aHasNext = aEnumerator.MoveNext();
                    bHasNext = bEnumerator.MoveNext();
                    indexA++;
                    indexB++;
                    break;
                case 1: // Fetched item is newer than source item
                    reportInsertion();
                    indexB++;
                    break; 
                case -1: // Source item is newer than fetched item
                    reportDeletion();
                    indexA++;
                    break; 
            }           
        }
        return Disposable.Empty;
    });     
    return observable;
} 

【讨论】:

  • 虽然存在一些问题,但还是很接近的:bIObservable(我想你可以在上面调用 ToEnumerable),项目不可为空,解决方案适用于 @ 987654325@s 而不是 T 其中T: IComparable
  • 我想知道是否可以通过 Rx 函数组合来做到这一点?成对分解两个可观察对象,聚合它们,不管怎样?
  • @DanAbramov 从一开始就创建 a 和 b IObservables 然后在两者上都使用 Subscribe(o) 而不是到处使用 MoveNext 和 Current 更容易。
  • 你会如何Subscribe(o) ?我不确定我是否在关注。
  • @DanAbramov 不要使用 GetEnumerator、Current 和 MoveNext,而是使用 Subscribe(o),其中 o 是传递给 Observable.Create 的 IObserver 参数(嗯,在传递给 Create() 的 Func 中) .这将需要进行很多更改,抱歉我现在没有时间,如果您仍然卡住,可能稍后再做。另外,是的,您可能可以组合其他运算符,例如合并两个列表,获取 Distinct 元素,将它们与两个列表进行比较等......再次,如果我稍后有时间,我会尝试更新这个答案。
【解决方案2】:

此代码基于Richard's answer,但适用于任何T
不过,我无法摆脱ToEnumerable 的诅咒——感谢任何帮助。

IObservable<Tuple<int, T, bool>> IndexDelta<T>(
    IObservable<T> first, IObservable<T> second
)
    where T : IComparable, IEquatable<T>
{
    return Observable.Create<Tuple<int, T, bool>> (o => {
        var a = first.ToEnumerable ().GetEnumerator ();
        var b = second.ToEnumerable ().GetEnumerator ();

        var indexA = -1;
        var indexB = -1;

        var hasNextA = true;
        var hasNextB = true;

        var headA = default(T);
        var headB = default(T);

        Action<bool> advanceA = (bool reportDeletion) => {
            if (reportDeletion) {
                o.OnNext (Tuple.Create (indexA, headA, false));
            }

            if (hasNextA = a.MoveNext ()) {
                indexA++;
                headA = a.Current;
            }
        };

        Action<bool> advanceB = (bool reportInsertion) => {
            if (reportInsertion) {
                o.OnNext (Tuple.Create (indexB, headB, true));
            }

            if (hasNextB = b.MoveNext ()) {
                indexB++;
                headB = b.Current;
            }
        };

        advanceA (false);
        advanceB (false);

        while (true) {
            if (!hasNextA && !hasNextB) {
                o.OnCompleted ();
                break;
            }

            if (!hasNextA) {
                advanceB (true);
                continue;
            } 

            if (!hasNextB) {
                advanceA (true);
                continue;
            } 

            switch (headA.CompareTo (headB)) {
            case 0:
                advanceA (false);
                advanceB (false);
                break;
            case 1:
                advanceA (true);
                break; 
            case -1:
                advanceB (true);
                break; 
            }          
        }

        return Disposable.Create (() => {
            a.Dispose ();
            b.Dispose ();
        });
    });     
} 

【讨论】:

  • 嗯,想解释一下否决票吗?我在生产中使用此代码,它工作得很好。
猜你喜欢
  • 1970-01-01
  • 2019-10-01
  • 1970-01-01
  • 1970-01-01
  • 2012-10-11
  • 2011-07-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多