【问题标题】:Disruptor example with 1 publisher and 4 parallel consumers具有 1 个发布者和 4 个并行消费者的破坏者示例
【发布时间】:2012-10-31 14:04:27
【问题描述】:

在这个例子中https://stackoverflow.com/a/9980346/93647 和这里Why is my disruptor example so slow?(在问题的末尾)有 1 个发布者和 1 个消费者。

但就我而言,消费者工作要复杂得多,需要一些时间。所以我想要 4 个并行处理数据的消费者。

例如,如果生产者生产数字:1,2,3,4,5,6,7,8,9,10,11..

我希望消费者 1 捕获 1,5,9,...消费者 2 捕获 2,6,10,...消费者 3 捕获 3,7,11,...消费者 4 捕获 4,8,12。 ..(不完全是这些数字,想法是数据应该并行处理,我不在乎哪个消费者处理了哪个特定数字)

请记住,这需要并行完成,因为在实际应用程序中,消费者工作非常昂贵。我希望消费者在不同的线程中执行以使用多核系统的强大功能。

当然,我可以只创建 4 个环形缓冲区并将 1 个消费者附加到 1 个环形缓冲区。这样我就可以使用原始示例。但我觉得这不会是正确的。创建 1 个发布者(1 个 ringbuffer)和 4 个消费者可能是正确的 - 因为这是我需要的。

在 google 群组中添加一个非常相似的问题的链接:https://groups.google.com/forum/#!msg/lmax-disruptor/-CLapWuwWLU/GHEP4UkxrAEJ

所以我们有两个选择:

  • 一个响铃多个消费者(每个消费者都会在每次添加时“唤醒”,所有消费者都应该有相同的 WaitStrategy)
  • 许多“一环 - 一个消费者”(每个消费者只会在它应该处理的数据上唤醒。每个消费者可以有自己的 WaitStrategy)。

【问题讨论】:

    标签: c# concurrency disruptor-pattern


    【解决方案1】:

    编辑:我忘了说部分代码取自the FAQ。我不知道这种方法是否比 Frank 的建议更好或更差。

    该项目的文档记录严重不足,很遗憾,因为它看起来不错。
    无论如何尝试以下片段(基于您的第一个链接) - 在单声道上测试并且似乎没问题:

    using System;
    using System.Threading.Tasks;
    using Disruptor;
    using Disruptor.Dsl;
    
    namespace DisruptorTest
    {
        public sealed class ValueEntry
        {
            public long Value { get; set; }
        }
    
        public class MyHandler : IEventHandler<ValueEntry>
        {
            private static int _consumers = 0;
            private readonly int _ordinal;
    
            public MyHandler()
            {
                this._ordinal = _consumers++;
            }
    
            public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
            {
                if ((sequence % _consumers) == _ordinal)
                    Console.WriteLine("Event handled: Value = {0}, event {1} processed by {2}", data.Value, sequence, _ordinal);
                else
                    Console.WriteLine("Event {0} rejected by {1}", sequence, _ordinal);                     
            }
        }
    
        class Program
        {
            private static readonly Random _random = new Random();
            private const int SIZE = 16;  // Must be multiple of 2
            private const int WORKERS = 4; 
    
            static void Main()
            {
                var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), SIZE, TaskScheduler.Default);
                for (int i=0; i < WORKERS; i++)
                    disruptor.HandleEventsWith(new MyHandler());
                var ringBuffer = disruptor.Start();
    
                while (true)
                {
                    long sequenceNo = ringBuffer.Next();
                    ringBuffer[sequenceNo].Value =  _random.Next();;
                    ringBuffer.Publish(sequenceNo);
                    Console.WriteLine("Published entry {0}, value {1}", sequenceNo, ringBuffer[sequenceNo].Value);
                    Console.ReadKey();
                }
            }
        }
    }
    

    【讨论】:

    • 你知道调用 ringbuffer Next() Publish 方法是线程安全的吗?我可以并行调用它们吗?我可以从两个不同的线程 ringbuffer Next 方法调用吗?
    • 中断者是如何在内部实现这一点的?创建了多少线程?破坏者是否为每个消费者创建单独的线程?还是使用了某种线程池?
    • 代码表明两者都设计为线程安全的,但我看不出你会从中获得什么。它依赖于 TPL(任务) - 只需阅读代码。
    • 谢谢,我创建了关于“锁定”的单独问题stackoverflow.com/questions/13350342/…
    【解决方案2】:

    从环形缓冲区的规范中,您会看到每个消费者都会尝试处理您的ValueEvent。在你的情况下你不需要那个。

    我是这样解决的:

    将已处理的字段添加到您的ValueEvent,当消费者接受他在该字段上测试的事件时,如果它已经被处理,他会转到下一个字段。

    不是最漂亮的方式,但缓冲区是这样工作的。

    【讨论】:

    • 您需要在该字段上进行同步吗?你是声明它volatile 还是使用Interlock 类来更新bool 字段?另外如何将多个消费者附加到环形缓冲区?我只能将一个消费者传递给HandleEventsWith 方法。到目前为止,对我来说,创建 4 个环形缓冲区并在每次发布时使用下一个环形缓冲区循环它们会更容易:)
    • 如果您创建 4 个环形缓冲区,您将失去环形缓冲区的“负载平衡”功能,我想这就是您使用它的原因。在您的其他 Q 上,我使用的是 JAVA 缓冲区,所以我无法向您展示代码,但请按照示例进行操作,它们非常清楚。
    • 在这种特殊情况下,我不需要“负载平衡”,因为我的任务几乎是“相等的”,只需将它们分成 4 个就可以了。然而,这是一个有趣的功能。我应该遵循 Java 示例吗?因为我几乎找不到任何 c# 示例。
    • @Frank,你用过WorkerPool吗?
    • @AlexAverbuch 我用过:ExecutorService executor = Executors.newFixedThreadPool(NUM_EVENT_PROCESSORS);
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多