【问题标题】:LMAX Disruptor as a blocking queue?LMAX Disruptor 作为阻塞队列?
【发布时间】:2016-06-29 09:08:39
【问题描述】:

有什么方法可以让我在一个结构中同时拥有这两者 -

  1. BlockingQueue 的语义,即 - 非阻塞 peek、阻塞 poll 和阻塞 put。多个提供者一个消费者。
  2. RingBuffer,它有效地用作对象池,因此我不想将新对象放入环形缓冲区,而是想重用那里的现有对象,复制状态。所以基本上 LMAX 破坏者的功能是开箱即用的。

是否已经有这样的东西了? 我想我可以尝试使用 Disruptor,如果我理解正确的话,我已经可以将它用作阻塞队列(如果环形缓冲区“满”)。它已经具有我需要的“可重用对象”语义。所以唯一的问题是如何创建一个能够 PULL 对象(而不是使用回调)的客户端,因为我不太熟悉内部 Disruptor 结构 - 可以做到吗?使用所有这些排序器,创建一个新的 EventProcessor 或类似的东西?

不,在客户端有一个阻塞队列并从中获取的明显解决方案不是一个理想的解决方案,因为它打破了使用中断对象池的全部要点 - 您需要有一个新池现在,或者只是在放入阻塞队列等之前在回调中创建一个新对象,我根本不想创建任何垃圾。

那么有没有办法使用 Disruptor 或任何其他面向性能/无垃圾的 Java 库来实现它?

【问题讨论】:

  • 为什么需要阻塞操作?注意:你必须小心如何重用池中的对象,以确保 a) 你永远不会回收正在使用的对象 b) 它不会比创建新对象慢。
  • 消费将比将对象放入队列中慢,这就是重点 - 如果消费者还没有准备好消费新事件(按照他自己的节奏),所有推送者都需要被阻止。不确定我是否理解a点,抱歉。我需要阻塞操作,因为我需要阻塞队列,有很多正当理由寻求阻塞队列的语义。阻塞队列唯一没有给我的东西 - 我显然不能重用阻塞队列中的对象,我需要创建新对象(或使用外部对象池)。
  • “我需要阻塞操作,因为我需要阻塞队列”我假设您已经考虑过使用阻塞操作会更慢并且会严重影响您的延迟,但这对您的用例并不重要,所以我会使用一个执行阻塞操作的队列。
  • 简而言之,开发人员使用 Distributor 之类的模型是因为他们需要避免阻塞操作,而忙碌等待很可能是唯一的解决方案。如果你需要一个阻塞操作,你有一个完全不同的用例。
  • 好吧,我只需要按照自己的节奏消费,如果队列已满,我需要阻止推送者,仅此而已。当然它比不这样做要慢,但可悲的是没有办法解决这个问题。但至少它肯定可以在不产生额外垃圾的情况下完成,对吧?所以我认为的第一个解决方案是有一个常规的 BlockingQueue 并有一个相同大小的单独的普通 RingBuffer,所以每次推送到队列都会从 RingBuffer 中获取下一个对象(当然,将数据复制到它)。但这似乎很浪费,而且你需要小心多线程。

标签: java multithreading performance garbage-collection disruptor-pattern


【解决方案1】:

我们在今年早些时候开源了 Conversant Diruptor,其中包括 DiruptorBlockingQueue。你可以在github找到代码

Conversant Disruptor 几乎可以轻松包含在任何项目中,因为它支持 BlockingQueue api 并在 Maven Central 上发布。

【讨论】:

  • 非常感谢约翰,我一定会去看看!有没有关于它的基准/文章?
  • 是的,有一个disruptor benchmark。在我的测量中,Conversant Disruptor 比 MPMC 快 2 倍,LMAX 至少比 JDK 中的任何东西好 5 倍。我很想听听您在自己的应用程序中发现了什么。还有一半像样的wiki。我最近发表了一个讨论 Java 并发和 Disruptor 的演讲,可以在 YouTube 找到它。
【解决方案2】:

出于好奇,我无法从 Disruptor 本身获得“阻塞拉取”语义,但将“阻塞”功能添加到非阻塞拉取当然是微不足道的。 “Peek”功能本身是可能的,但效率不高(您需要在每次 peek 时一次又一次地复制项目)并且可以通过仅缓存“poll”的结果来替换。

所以,最小的原始解决方案只实现了我需要的方法:

public class DisruptorMPSCQueue<T extends ICopyable<T>> {

    private final RingBuffer<T> ringBuffer;
    private final EventPoller<T> eventPoller;
    private T tempPolledEvent;

    private EventPoller.Handler<T> pollerHandler = new EventPoller.Handler<T>() {
        @Override
        public boolean onEvent(final T event, final long sequence, final boolean endOfBatch) throws Exception {
            tempPolledEvent.copyFrom(event);
            return false;
        }
    };

    public DisruptorMPSCQueue(EventFactory<T> typeConstructor, int size) {
        ringBuffer = RingBuffer.createMultiProducer(typeConstructor, size);
        eventPoller = ringBuffer.newPoller();
        ringBuffer.addGatingSequences(eventPoller.getSequence());
    }

    /**
     * Blocking, can be called from any thread, the event will be copied to the ringBuffer
     */
    public void put(final T event) {
        long sequence = ringBuffer.next(); // blocked by ringBuffer's gatingSequence
        ringBuffer.get(sequence).copyFrom(event);
        ringBuffer.publish(sequence);
    }

    /**
     * Not blocking, can be called from any thread, the event will be copied to the ringBuffer
     *
     * @throws IllegalStateException if the element cannot be added at this time due to capacity restrictions
     */
    public void offer(final T event) {
        long sequence;
        try {
            sequence = ringBuffer.tryNext();
        } catch (InsufficientCapacityException e) {
            throw new IllegalStateException(e); // to mimic blockingQueue
        }
        ringBuffer.get(sequence).copyFrom(event);
        ringBuffer.publish(sequence);
    }

    /**
     * Retrieve top of the queue(removes from the queue). NOT thread-safe, can be called from one thread only.
     *
     * @param destination top of the queue will be copied to destination
     * @return destination object or null if the queue is empty
     */
    public T poll(final T destination) {
        try {
            tempPolledEvent = destination;  // yea, the poller usage is a bit dumb
            EventPoller.PollState poll = eventPoller.poll(pollerHandler);
            if (poll == EventPoller.PollState.PROCESSING) {
                return tempPolledEvent;
            } else {
                return null;
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

【讨论】:

    猜你喜欢
    • 2013-12-21
    • 2013-11-23
    • 1970-01-01
    • 2014-10-16
    • 2012-05-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多