【问题标题】:Why my Disruptor program don't take full advantage of the ringbuffer为什么我的 Disruptor 程序没有充分利用 ringbuffer
【发布时间】:2018-01-20 03:51:41
【问题描述】:

Disruptor github地址为:https://github.com/LMAX-Exchange/disruptor

我有一个简单的测试如下:

public class DisruptorMain {
    @SuppressWarnings({ "rawtypes", "unchecked" })
    public static void main(String[] args) throws Exception {
        class Element {

            private int value;

            public int get() {
                return value;
            }

            public void set(int value) {
                this.value = value;
            }

        }

        ThreadFactory threadFactory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "simpleThread");
            }
        };

        EventFactory<Element> factory = new EventFactory<Element>() {
            @Override
            public Element newInstance() {
                return new Element();
            }
        };

        EventHandler<Element> handler = new EventHandler<Element>() {
            @Override
            public void onEvent(Element element, long sequence, boolean endOfBatch) {
                try {
                    Thread.sleep(1000 * sequence);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println("Element: " + element.get());
            }
        };

        BlockingWaitStrategy strategy = new BlockingWaitStrategy();

        int bufferSize = 4;

        Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);

        disruptor.handleEventsWith(handler);

        disruptor.start();

        RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();

        for (int l = 0; l < 8; l++) {
            long sequence = ringBuffer.next();
            System.out.println("sequence:" + sequence);

            try {
                Element event = ringBuffer.get(sequence);
                event.set(l);
            } finally {
                ringBuffer.publish(sequence);
            }
        }
    }
}

结果是: 序列:0 顺序:1 顺序:2 顺序:3 元素:0 元素:1 元素:2 元素:3 顺序:4 序列:5 序列:6 顺序:7 元素:4 元素:5 元素:6 元素:7

在我的测试中,我定义了一个大小为 4 的 ringbuffer,我有一个生产者为其创建 8 个任务,我的问题是,当生产者将 4 个任务放入 ringbuffer 时,消费者开始接受任务从 ringbuffer 开始工作,task 1 完成后 ringbuffer 应该有一个空的空间给 task 5,但是结果表明,只有 ringbuffer 中的所有任务都完成了,ringbuffer 才能接受新的任务,为什么?

【问题讨论】:

    标签: java producer-consumer disruptor-pattern


    【解决方案1】:

    这是因为 Disruptor 将批处理事件处理程序。如果事件处理程序很慢或环形缓冲区很小,批量大小通常可以是环形缓冲区的大小。 Disruptor 只会更新该事件处理程序的已处理序列,直到批处理完成。这减少了它需要对发布者使用的序列变量进行更新以确定是否有可用空间的次数。如果您需要比默认空间更早地提供可用空间,则可以使用 SequenceReportingEventHandler 来实现。

    public class MyEventHandler implements SequenceReportingEventHandler<Element> {
        Sequence processedSequence;
    
        public void setSequenceCallback(Sequence s) {
            processedSequence = s;
        }
    
        public void onEvent(Element e, long sequence, boolean endOfBatch) {
            // Do stuff
            processedSequence.set(sequence);
        }
    }
    

    【讨论】:

    • 可以自己控制batch size吗?
    • 计算接收到的事件数量,一旦达到阈值,就将其视为批处理已完成,即调用processedSequence::set 并刷新任何底层资源,例如提交数据库连接或将任何批处理数据推送到 I/O 调用。
    • Re:“Disruptor 只会更新该事件处理程序的已处理序列,直到批处理完成”您回答中的句子:这是默认行为,还是它受等待策略或其他影响?如果这是默认行为(即默认处理批处理,并且保证在处理批处理中的最后一个事件之前不会覆盖带有 endOfBatch=false 的事件槽),BatchEventProcessor 给我们买了什么?我的印象是,它通过处理序列处理来帮助这种用法。
    • BatchEventProcessor 是该代码中实现所描述行为的部分。默认是一个批处理,它是所有可用消息的大小,并在批处理结束时更新序列。这不受 WaitStrategy 影响。您可以使用 setSequenceCallback 更改行为。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-04-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多