【问题标题】:Extremely long Rx event chains极长的 Rx 事件链
【发布时间】:2017-10-23 18:02:39
【问题描述】:

我正在开发最适合描述为模拟/工作流程/游戏逻辑引擎的东西(尽管它不完全属于任何这些类别)。

其目的是让它完全由事件驱动(反应式),并且必须支持数万甚至数十万个链式事件的可能性,具有分支、过滤和并发性,以及所有 Rx 优点。

我是非常响应式扩展的新手,并决定编写我能想到的最简单的测试(将一堆 ISubject 链接在一起)。我很快发现将太多事件(在我的情况下大约为 12000 个)链接在一起会导致 StackOverflowException ——这对我来说是有意义的,因为考虑到 Rx 实际上只是以新颖的方式将事件处理程序连接在一起,而调用堆栈只能变得如此之深。

所以我正在寻找一种(反应式?)解决此限制的方法。我不能是唯一一个想用这个框架做一些非常大的事情的人。社区可以提供的任何帮助将不胜感激。

这是我的测试代码:

class Program
{
    static void Main(string[] args)
    {
        for (int i = 0; i < 1000000; i += 1000)
        {
            Console.Write($"{i} ");
            using (Dynamite dynamite = new Dynamite())
            {
                dynamite.Setup(i);
                dynamite.Trigger();
            }
        }
        Console.ReadKey();

    }
}

public class Dynamite : IDisposable
{
    ISubject<bool> start = null;
    IList<IDisposable> handles = new List<IDisposable>();

    public void Setup(int length)
    {
        length = length == 0 ? 1 : length;

        var fuses =
            Enumerable.Range(0, length)
            .Select(v => new Subject<bool>())
            .ToArray();

        ISubject<bool> prev = null;

        foreach (var fuse in fuses)
        {
            //Console.Write(".");

            if (prev != null)
            {
                Attach(prev, fuse);
            }
            prev = fuse;
        }

        start = fuses.First();
        var end = fuses.Last();

        handles.Add(
            end
                .Subscribe(onNext: b =>
                {
                    //Console.Write("t");
                    this.Explode();
                }));
    }

    void Attach(ISubject<bool> source, ISubject<bool> dest)
    {
        var handle = source
            .Subscribe(onNext: b =>
             {
                 //Console.Write("s");
                 dest.OnNext(b);
             });
        handles.Add(handle);
    }

    public void Trigger()
    {
        //Console.Write("p");
        start.OnNext(true);
    }

    void Explode()
    {
        Console.WriteLine("...BOOM!");
    }

    public void Dispose()
    {
        foreach (var h in handles)
            h.Dispose();
    }
}

这是控制台输出:

0 ...BOOM!
1000 ...BOOM!
2000 ...BOOM!
3000 ...BOOM!
4000 ...BOOM!
5000 ...BOOM!
6000 ...BOOM!
7000 ...BOOM!
8000 ...BOOM!
9000 ...BOOM!
10000 ...BOOM!
11000 ...BOOM!
12000
Process is terminated due to StackOverflowException.

【问题讨论】:

    标签: c# observable system.reactive


    【解决方案1】:

    我找到了解决方案。 CurrentThread 调度程序。

    .ObserveOn(Scheduler.CurrentThread)
    

    CurrentThreadScheduler [...] 将安排在进行原始调用的线程上执行的操作。该动作不会立即执行,而是放入队列中,仅在当前动作完成后执行。

    【讨论】:

    • 你能解释一下这是怎么回答的吗?
    • @Enigmativity 由于序列中的每个操作都会被调度,因此不会立即在同一个 CallStack 中执行。这将防止 StackOverFlowException 发生。
    • 还有其他选择吗?
    • @ErinLoy - 我不明白你的解释。你能解释一下,因为序列中的每个操作都是预定的,并且使用.ObserveOn(Scheduler.CurrentThread) 将使它使用相同的调用堆栈(它没有),这是如何工作的?
    • @Enigmativity 在单线程场景中,Rx 默认使用 ImmediateScheduler,它只是调用 OnNext 操作,生成当前堆栈。将足够多的这些链接在一起,您就会溢出堆栈。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-03-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多