【问题标题】:Reactive Rx zip queue in .Net.Net 中的反应式 Rx 压缩队列
【发布时间】:2018-05-17 16:04:04
【问题描述】:

我对反应式编程的概念相当陌生。我正在使用Bonsai,它通过 c# 公开了一些但不是全部的 .Net rx 命令。

我正在尝试获得类似大理石图的行为:

input1: ---1--------2--------3--------4--------5--------6--------7
input2: -------abc----------------------------------def-----------
result: ------------a--------b--------c--------c---------d-------e

基本上,输入 2 会生成应存储在队列中的事件波。 输入 1 用作从该队列中发出单个项目的触发器。

当队列为空时,应该发出队列的最后一项。 我尝试了 zip 和 combineLatest 的各种组合,但无法获得所需的行为。

我还尝试了基于this postWithLatestFrom 实现,但回想起来这也不会产生预期的行为。

public IObservable<Tuple<TSource, TOther>> Process<TSource, TOther>(
            IObservable<TSource> source,
            IObservable<TOther> other)
        {


            // return source1.WithLatestFrom(source2, (xs, ys) => Tuple.Create(xs, ys));
            return source.Publish(os => other.Select(a => os.Select(b => Tuple.Create(b, a))).Switch());
        }

是否有任何运算符或运算符组合会产生这种行为?一旦我了解了使用哪些运算符,我就可以对 Bonsai 进行实施。

更新 1:2018/05/18

根据 Sentinel 的帖子,我在 Bonsai 命名空间内编写了一个新类 DiscriminatedUnion。我没有设法指定适当的类型。编译器声明“无法推断Merge 的类型参数”(在.Merge(input1.Select... 中)。 在哪里添加正确的类型规范?

using System.Reactive.Linq;
using System.ComponentModel;
using System.Collections.Immutable;    
namespace Bonsai.Reactive
{
    [Combinator]
   // [XmlType(Namespace = Constants.XmlNamespace)]
    [Description("Implementation of Discriminated Union")]
    public class DiscriminatedUnion
    {
        public IObservable<int?> Process<TInput1, TInput2>(
           IObservable<TInput1> input1,
            IObservable<TInput2> input2)
        {
            var merged =
                        input2.Select(s2 => Tuple.Create(2, (TInput2)s2))
                        .Merge(input1.Select(s1 => Tuple.Create(1, (TInput1)s1)))
                        .Scan(Tuple.Create((int?)null, new Queue<int>(), 0), (state, val) =>
                        {
                            int? next = state.Item1;
                            if (val.Item1 == 1)
                            {
                                if (state.Item2.Count > 0)
                                {
                                    next = state.Item2.Dequeue();
                                }
                            }
                            else
                            {
                                state.Item2.Enqueue(val.Item2);
                            }
                            return Tuple.Create(next, state.Item2, val.Item1);
                        })
                        .Where(x => (x.Item1 != null && x.Item3 == 1))
                        .Select(x => x.Item1);
            return merged;
        }
    }
}

【问题讨论】:

  • 修复我的答案。向下滚动查看第二个版本。
  • 又添加了一个答案,但建议您尝试使用该答案或等待 Shlomo 的分析:-D

标签: c# system.reactive


【解决方案1】:

这是您的问题的可测试表示(或弹珠图),使用 NuGet 包Microsoft.Reactive.Testing

var scheduler = new TestScheduler();
var input1 = scheduler.CreateColdObservable<int>(
    ReactiveTest.OnNext(1000.Ms(), 1),
    ReactiveTest.OnNext(2000.Ms(), 2),
    ReactiveTest.OnNext(3000.Ms(), 3),
    ReactiveTest.OnNext(4000.Ms(), 4),
    ReactiveTest.OnNext(5000.Ms(), 5),
    ReactiveTest.OnNext(6000.Ms(), 6),
    ReactiveTest.OnNext(7000.Ms(), 7)
);
var input2 = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(1400.Ms(), "a"),
    ReactiveTest.OnNext(1500.Ms(), "b"),
    ReactiveTest.OnNext(1600.Ms(), "c"),
    ReactiveTest.OnNext(5500.Ms(), "d"),
    ReactiveTest.OnNext(5600.Ms(), "e"),
    ReactiveTest.OnNext(5700.Ms(), "f")
);

使用这种扩展方法:

public static class TickExtensions
{
    public static long Ms(this int ms)
    {
        return TimeSpan.FromMilliseconds(ms).Ticks;
    }
}

这个问题基本上是一个状态机问题,涉及两个不同类型的 observables。解决这个问题的最佳方法是使用 Discriminated Union 类型,它在 C# 中不存在,因此我们将创建一个。 @Sentinel 的回答是用元组做到的,这也可以:

public class DUnion<T1, T2>
{
    public DUnion(T1 t1) 
    { 
        Type1Item = t1;
        Type2Item = default(T2);
        IsType1 = true;
    }

    public DUnion(T2 t2) 
    { 
        Type2Item = t2;
        Type1Item = default(T1);
        IsType1 = false;
    }

    public bool IsType1 { get; }
    public bool IsType2 => !IsType1;

    public T1 Type1Item { get; }
    public T2 Type2Item { get; }
}

然后我们可以将两个不同类型的流SelectMerge 合并到一个有区别的联合流中,我们可以在其中使用Scan 管理状态。您的状态逻辑有点棘手,但可行:

  • 如果一个号码到达并且队列中没有项目,则什么也不做
  • 如果一个数字到达并且队列中有项目,则发出队列中的第一个项目。
    • 如果有多个项目,则从队列中删除最近的发射。
    • 如果队列只有一项,不要删除它,进入“假空”状态。
  • 如果一个字符串到达​​,将它放入队列中。
    • 如果队列为“fake-empty”,则弹出最后一项并退出“fake-empty”状态。

这是结果 observable(使用 NuGet 包 System.Collections.Immutable):

var result = input1.Select(i => new DUnion<int, string>(i))
    .Merge(input2.Select(s => new DUnion<int, string>(s)))
    .Scan((queue: ImmutableQueue<string>.Empty, item: (string)null, isFakeEmptyState: false, emit: false), (state, dItem) => dItem.IsType1
        ? state.queue.IsEmpty   
            ? (state.queue, null, false, false)     //Is integer, but empty queue, so don't emit item
            : state.queue.Dequeue().IsEmpty //Is integer, at least one item: dequeue unless only one item, then emit either way
                ? (state.queue,           state.queue.Peek(), true,  true)
                : (state.queue.Dequeue(), state.queue.Peek(), false, true)
        : state.isFakeEmptyState //Is new string, just add to queue, don't emit
            ? (state.queue.Dequeue().Enqueue(dItem.Type2Item), null, false, false) 
            : (state.queue.Enqueue(dItem.Type2Item),   (string)null, false, false) 
    )
    .Where(t => t.emit)
    .Select(t => t.item);

然后可以进行如下测试:

var observer = scheduler.CreateObserver<string>();
result.Subscribe(observer);
scheduler.Start();
observer.Messages.Dump(); //Linqpad. Can replace with Console.Writeline loop.

更新:我考虑了一下,我认为围绕 Discriminate Union 功能抛出一些运算符是有意义的。这样您就不必显式处理类型:

public static class DUnionExtensions
{
    public class DUnion<T1, T2>
    {
        public DUnion(T1 t1)
        {
            Type1Item = t1;
            Type2Item = default(T2);
            IsType1 = true;
        }

        public DUnion(T2 t2)
        {
            Type2Item = t2;
            Type1Item = default(T1);
            IsType1 = false;
        }

        public bool IsType1 { get; }
        public bool IsType2 => !IsType1;

        public T1 Type1Item { get; }
        public T2 Type2Item { get; }
    }

    public static IObservable<DUnion<T1, T2>> Union<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
    {
        return a.Select(x => new DUnion<T1, T2>(x))
            .Merge(b.Select(x => new DUnion<T1, T2>(x)));
    }

    public static IObservable<TState> ScanUnion<T1, T2, TState>(this IObservable<DUnion<T1, T2>> source,
            TState initialState,
            Func<TState, T1, TState> type1Handler,
            Func<TState, T2, TState> type2Handler)
        {
            return source.Scan(initialState, (state, u) => u.IsType1
                ? type1Handler(state, u.Type1Item)
                : type2Handler(state, u.Type2Item)
            );
        }
}

有了这些扩展方法,解决方案就变成了这样,我认为这样更好:

var result = input1
    .Union(input2)
    .ScanUnion((queue: ImmutableQueue<string>.Empty, item: (string)null, isFakeEmptyState: false, emit: false), 
        (state, _) => state.queue.IsEmpty
            ? (state.queue, null, false, false)     //empty queue, so don't emit item
            : state.queue.Dequeue().IsEmpty         //At least one item: dequeue unless only one item, then emit either way
                ? (state.queue, state.queue.Peek(), true, true) //maintain last item, enter Fake-EmptyState
                : (state.queue.Dequeue(), state.queue.Peek(), false, true),
        (state, s) => state.isFakeEmptyState 
            ? (state.queue.Dequeue().Enqueue(s), null, false, false)
            : (state.queue.Enqueue(s), (string)null, false, false)
    )
    .Where(t => t.emit)
    .Select(t => t.item); 

如果您在命名元组语法上遇到问题,那么您可以使用旧的元组:

var result = input1
    .Union(input2)
    .ScanUnion(Tuple.Create(ImmutableQueue<string>.Empty, (string)null, false, false),
        (state, _) => state.Item1.IsEmpty
            ? Tuple.Create(state.Item1, (string)null, false, false)     //empty queue, so don't emit item
            : state.Item1.Dequeue().IsEmpty         //At least one item: dequeue unless only one item, then emit either way
                ? Tuple.Create(state.Item1, state.Item1.Peek(), true, true) //maintain last item, enter Fake-EmptyState
                : Tuple.Create(state.Item1.Dequeue(), state.Item1.Peek(), false, true),
        (state, s) => state.Item3
            ? Tuple.Create(state.Item1.Dequeue().Enqueue(s), (string)null, false, false)
            : Tuple.Create(state.Item1.Enqueue(s), (string)null, false, false)
    )
    .Where(t => t.Item4)
    .Select(t => t.Item2);

【讨论】:

  • 我喜欢你使用诸如歧视联合之类的术语清楚地解释它,这对我来说是新的。你能给我指点这类事情的学习参考吗,因为我觉得我很直观地解决了这些问题,不知道是否有更好的选择?
  • 另外....什么/为什么是 ImmutableQueue?我想我可能已经看过这门课了,并且上次想知道它
  • 非常感谢。我有两个问题来适应我的IObservable:1.) 我的输入类型是TSourceTOther,不能保证intstring。 2.) 我的编译器不知道queue。在.Scan((queue: ImmutableQueue&lt;string&gt;.Empty, ... 线上,我收到queue 在当前上下文中不存在的错误。它在.Empty 之后期望;}。除了System.Collections.Immutable,我还需要其他包吗?
  • 似乎我在命名参数语法上磕磕绊绊?
  • 更新了使用未命名元组的答案。 @jlarsch:您可以将intstring 替换为TSourceTOther。这些答案中没有任何内容使用 int 或 string 质量,除了 null (只需替换为 default(TSource) 或其他。
【解决方案2】:

这能解决问题吗?可能有一种更好的方法来执行此缓冲区,因此可能值得重新考虑。

        Random r = new Random();
        var source1 = Observable.Interval(TimeSpan.FromSeconds(1)).Publish().RefCount();
        var source2 = Observable.Interval(TimeSpan.FromMilliseconds(7000)).Select(x => Enumerable.Range(1, 3).Select(y => r.Next(200)).ToObservable()).SelectMany(x => x).Publish().RefCount();

        source1.Subscribe(x => Console.WriteLine("Source1 " + x));
        source2.Subscribe(x => Console.WriteLine("Source2 " + x));



        var merged =
            source2.Select(s2 => Tuple.Create(2, s2))
            .Merge(source1.Select(s1 => Tuple.Create(1, (int)s1)))
            .Scan(Tuple.Create((int?)null, new Queue<int>(),0), (state, val) =>
                 {
                     int? next = state.Item1;
                     if (val.Item1 == 1)
                     {
                         if (state.Item2.Count > 0)
                         {
                             next = state.Item2.Dequeue();
                         }
                     }
                     else
                     {
                         state.Item2.Enqueue(val.Item2);

                     }
                     return Tuple.Create(next, state.Item2,val.Item1);
                 })
            .Where(x=>(x.Item1!=null && x.Item3==1))
            .Select(x => x.Item1);



        merged.Subscribe(x => Console.WriteLine("Merged "+x));

更新 OP的固定代码:

 public class DiscriminatedUnion
{
    public static IObservable<TInput2> Process<TInput1, TInput2>(
       IObservable<TInput1> input1,
        IObservable<TInput2> input2)
    {
        var merged =
                    input2.Select(s2 => Tuple.Create(2, (object)s2))
                    .Merge(input1.Select(s1 => Tuple.Create(1, (object)s1)))
                    .Scan(Tuple.Create(default(TInput2), new Queue<TInput2>(), 0), (state, val) =>
                    {
                        TInput2 next = state.Item1;
                        if (val.Item1 == 1)
                        {
                            if (state.Item2.Count > 0)
                            {
                                next = state.Item2.Dequeue();
                            }
                        }
                        else
                        {
                            state.Item2.Enqueue((TInput2)val.Item2);
                        }
                        return Tuple.Create(next, state.Item2, val.Item1);
                    })
                    .Where(x => (!x.Item1.Equals(default(TInput2)) && x.Item3 == 1))
                    .Select(x => x.Item1);
        return merged;
    }
}

【讨论】:

  • 谢谢!作为 c# 的新手,我正在努力使用您的建议编写我的问题中描述的 public IObservable 类型的类。我会尝试和测试。
  • 打我一拳!我刚刚写了一些非常相似的东西。
  • @jlarsch 只是返回合并,这是 IObservable。要进行测试,请将其添加到控制台应用程序中的 Main。
  • 我尝试使用我的 public IObservable: input2.Select(s2 =&gt; Tuple.Create(2, (TInput2)s2)) .Merge(input1.Select(s1 =&gt; Tuple.Create((int)1, (TInput1)s1))) 的类型来调整您的代码,但是我的编译器抱怨合并也无法推断类型参数
  • @jlarsch 离开电脑几个小时。请更新您的问题以包含修改后的代码,我会为您更正。上面的代码确实有效,您只需要更改类型即可。
【解决方案3】:

我喜欢这些 Rx 谜题。不敢相信有人会为此付出代价。所以我想出了一个稍微不同的方法。我认为这里的竞争条件存在一些弱点,但我很好奇您的想法以及如何消除这些弱点。

基本思想是将队列视为递归缓冲区,直到在 source1 上,缓冲区在没有第一个元素的情况下重播到队列中。

更新

基于 shlomo 观察到需要 publish().refcount(),我更新了代码并将解决方案变成了扩展“RegulatedQueue”。请看下面的代码。 Input2为队列调节源,Input1为调节信号。

public static class RxHelpers
{
    public static IObservable<TInput2> RegulatedQueue<TInput1, TInput2>(this IObservable<TInput2> input2,
       IObservable<TInput1> input1
        )
    {
        return Observable.Using(() => new Subject<TInput2>(),
        queue =>
        {
            input2.Subscribe(queue);
            return queue
                .Buffer(() => input1)
                .Do(l => { foreach (var n in l.Skip(l.Count > 1 ? 1 : 0)) queue.OnNext(n); })
                .Where(l => l.Count > 0)
                .Select(l => l.First()).
                Publish().
                RefCount();
        });
    }
}


class Program
{


    static void Main(string[] args)
    {
        Random r = new Random();
        var source1 = Observable.Interval(TimeSpan.FromSeconds(1)).Publish().RefCount();
        var source2 = Observable.Interval(TimeSpan.FromMilliseconds(2000)).Select(x => Enumerable.Range(1, 3).Select(y => r.Next(200)).ToObservable()).SelectMany(x => x).Publish().RefCount();

        source1.Subscribe(x => Console.WriteLine("Source1 " + x));
        source2.Subscribe(x => Console.WriteLine("Source2 " + x));

        var merged = source2.RegulatedQueue(source1);

        merged.Subscribe(x => Console.WriteLine("Merged1 " + x));
        merged.Subscribe(x => Console.WriteLine("Merged2 " + x));






        Console.ReadKey();

    }
}

已过时

  static void Main(string[] args)
    {
        Random r = new Random();
        var source1 = Observable.Interval(TimeSpan.FromSeconds(1)).Publish().RefCount();
        var source2 = Observable.Interval(TimeSpan.FromMilliseconds(7000)).Select(x => Enumerable.Range(1, 3).Select(y => r.Next(200)).ToObservable()).SelectMany(x => x).Publish().RefCount();

        source1.Subscribe(x => Console.WriteLine("Source1 " + x));
        source2.Subscribe(x => Console.WriteLine("Source2 " + x));

        //THIS BIT
         Subject<int> queue = new Subject<int>();
        source2.Subscribe(queue);
        var merged=queue
            .Buffer(() => source1)
            .Do(l => { foreach (var n in l.Skip(l.Count > 1 ? 1 : 0)) queue.OnNext(n); })
            .Where(l=>l.Count > 0)
            .Select(l => l.First());





            merged.Subscribe(x => Console.WriteLine("Merged "+x));







        Console.ReadKey();

    }

测试代码:

var scheduler = new TestScheduler();
var input1 = scheduler.CreateColdObservable<int>(
    ReactiveTest.OnNext(1000.Ms(), 1),
    ReactiveTest.OnNext(2000.Ms(), 2),
    ReactiveTest.OnNext(3000.Ms(), 3),
    ReactiveTest.OnNext(4000.Ms(), 4),
    ReactiveTest.OnNext(5000.Ms(), 5),
    ReactiveTest.OnNext(6000.Ms(), 6),
    ReactiveTest.OnNext(7000.Ms(), 7)
);
var input2 = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(1400.Ms(), "a"),
    ReactiveTest.OnNext(1500.Ms(), "b"),
    ReactiveTest.OnNext(1600.Ms(), "c"),
    ReactiveTest.OnNext(5500.Ms(), "d"),
    ReactiveTest.OnNext(5600.Ms(), "e"),
    ReactiveTest.OnNext(5700.Ms(), "f")
);

Subject<string> queue = new Subject<string>();
input2.Subscribe(queue);
var result = queue
    .Buffer(() => input1)
    .Do(l => { foreach (var n in l.Skip(l.Count > 1 ? 1 : 0)) queue.OnNext(n); })
    .Where(l => l.Count > 0)
    .Select(l => l[0]);

result.Timestamp(scheduler)
    .Select(t => $"{t.Timestamp.Ticks} ticks: {t.Value}")
    .Dump(); //Linqpad

预期输出:

//14000000 enqueue a
//15000000 enqueue b
//16000000 enqueue c
20000000 ticks: a 
30000000 ticks: b 
40000000 ticks: c 
50000000 ticks: c 
//55000000 enqueue d
//56000000 enqueue e
//57000000 enqueue f
60000000 ticks: c //should really be d, but there's no handling for fake-empty ejection
70000000 ticks: d 
80000000 ticks: e 
90000000 ticks: f 
100000000 ticks: f 
110000000 ticks: f 
120000000 ticks: f 
130000000 ticks: f 
140000000 ticks: f 
...

实际输出:

20000000 ticks: a 
30000000 ticks: b 
40000000 ticks: c 
50000000 ticks: b 
60000000 ticks: c 
70000000 ticks: b 
80000000 ticks: c 
90000000 ticks: c 
100000000 ticks: b 
110000000 ticks: c 
120000000 ticks: c 
130000000 ticks: b 
140000000 ticks: c 
150000000 ticks: b 
160000000 ticks: c 
170000000 ticks: b 
180000000 ticks: c 
190000000 ticks: c 

【讨论】:

  • 测试代码失败(不断发出 b-c),但我不知道为什么。我建议使用可测试值而不是随机值。你可以很容易地用它来欺骗自己。
  • 添加了测试代码的编辑。该方法应该有效,并让我想起了很多我经常回头的答案:stackoverflow.com/questions/4123178/…
  • @Shlomo 好的,我现在得下车,稍后再检查。它适用于我拥有的代码,但我认为它的基本形式应该容易受到与线程相关的错误的影响。
  • 哦哈哈。这是一个多订阅者问题:多个订阅者会排队,导致重复排队。单订户工作正常。添加.Publish().RefCount()和多订阅者作品。
  • @shlomo 啊,我明白了。不过,认为它需要从同步的角度进行一些研究。 OP 现在可以将其用作易于阅读的玩具点头解决方案。
猜你喜欢
  • 2013-06-01
  • 2011-09-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2010-12-07
  • 2011-10-18
  • 2018-12-27
  • 1970-01-01
相关资源
最近更新 更多