【问题标题】:Spring Reactor benefits when Number of Publisher threads is much higher than number of consumers当发布者线程数远高于消费者数时,Spring Reactor 受益
【发布时间】:2015-01-06 17:26:12
【问题描述】:

我有以下用例:

  • N 个线程发布数据(N 可以从 10 到 1000 个线程),这些线程可以只使用本地计算机进行 HTTP 请求、jdbc 调用、纯 java 处理
  • 1 到 M 线程消耗它进行 IO(发送 HTTP 请求,写入数据库......可能批量),这些线程不应该减慢发布者的速度。 M 不得超过 10 个线程。

N 个线程发布数据的速度可能比消费者消费它的速度快得多,但其想法是最大限度地减少发布者的减速。

我已经实现了一种基于 ArrayBlockingQueue 的方法,其中发布者写入,以及一个从队列中获取数据并处理它的线程,它可以工作但结果不是很好。

因此,我正在研究 Reactor 模式,尤其是 Spring-Reactor,看看它是否可以响应我的用例。是这样吗?

我读到了:

在我的情况下,发布者线程的数量远高于消费者的数量,这是正确的选择吗?

【问题讨论】:

    标签: java multithreading spring reactor


    【解决方案1】:

    听起来您可能想查看 Reactor 的 PersistentQueue facility 并将您的发布者与订阅者分开。这是一个普通的Queue 实现,但它使用 Chronicle Queue 来实现持久性、故障转移和可重放性。它也非常非常快。

    您基本上会让发布者从一侧将数据推送到 PersistentQueue,而一组订阅者从另一侧拉出数据。如果您已经在使用Queue,它可能是您当前使用的替代品。

    我需要在上面写一个 wiki 页面来展示一些基本的使用模式。

    【讨论】:

    • 非常感谢您的回答。是的,我认为 wiki 页面会很棒。作为图书馆的新用户,这听起来很有希望,但在我看来,目前缺乏文档。应该为该领域的初学者解释一些概念,并关注哪些部分是线程安全的,哪些不是。
    【解决方案2】:

    我使用自定义容器类处理了类似的问题。它通过 CAS 对象使用双缓冲方法,允许您在一次无锁操作中读取所有累积的对象。

    我不知道它的效率如何,但它的简单性应该可以确保它与优秀的产品并驾齐驱。

    请注意,下面的大部分代码是测试代码 - 您可以删除 //TESTING 注释下方的所有代码而不影响功能。

    /**
     * Lock free - thread-safe.
     *
     * Write from many threads - read with fewer threads.
     *
     * Write items of type T.
     *
     * Read items of type List<T>.
     *
     * @author OldCurmudgeon
     * @param <T> - Th etype we plan to write/read.
     */
    public class DoubleBufferedList<T> {
    
        /**
         * Atomic reference so I can atomically swap it through.
         *
         * Mark = true means I am adding to it so momentarily unavailable for iteration.
         */
        private final AtomicMarkableReference<List<T>> list = new AtomicMarkableReference<>(newList(), false);
    
        // Factory method to create a new list - may be best to abstract this.
        protected List<T> newList() {
            return new ArrayList<>();
        }
    
        /**
         * Get and replace the current list.
         *
         * Used by readers.
         *
         * @return List<T> of a number (possibly 0) of items of type T.
         */
        public List<T> get() {
            // The list that was there.
            List<T> it;
            // Replace an unmarked list with an empty one.
            if (!list.compareAndSet(it = list.getReference(), newList(), false, false)) {
                // Mark was not false - Failed to replace!
                // It is probably marked as being appended to but may have been replaced by another thread.
                // Return empty and come back again soon.
                return Collections.<T>emptyList();
            }
            // Successfull replaced an unmarked list with an empty list!
            return it;
        }
    
        /**
         * Grab and lock the list in preparation for append.
         *
         * Used by add.
         */
        private List<T> grab() {
            List<T> it;
            // We cannot fail so spin on get and mark.
            while (!list.compareAndSet(it = list.getReference(), it, false, true)) {
                // Spin on mark - waiting for another grabber to release (which it must).
            }
            return it;
        }
    
        /**
         * Release the grabbed list.
         *
         * Opposite of grab.
         */
        private void release(List<T> it) {
            // Unmark it - should this be a compareAndSet(it, it, true, false)?
            if (!list.attemptMark(it, false)) {
                // Should never fail because once marked it will not be replaced.
                throw new IllegalMonitorStateException("It changed while we were adding to it!");
            }
        }
    
        /**
         * Add an entry to the list.
         *
         * Used by writers.
         *
         * @param entry - The new entry to add.
         */
        public void add(T entry) {
            List<T> it = grab();
            try {
                // Successfully marked! Add my new entry.
                it.add(entry);
            } finally {
                // Always release after a grab.
                release(it);
            }
        }
    
        /**
         * Add many entries to the list.
         *
         * @param entries - The new entries to add.
         */
        public void add(List<T> entries) {
            List<T> it = grab();
            try {
                // Successfully marked! Add my new entries.
                it.addAll(entries);
            } finally {
                // Always release after a grab.
                release(it);
            }
        }
    
        /**
         * Add a number of entries.
         *
         * @param entries - The new entries to add.
         */
        @SafeVarargs
        public final void add(T... entries) {
            // Make a list of them.
            add(Arrays.<T>asList(entries));
        }
    
        // TESTING.
        // How many testers to run.
        static final int N = 10;
        // The next one we're waiting for.
        static final AtomicInteger[] seen = new AtomicInteger[N];
        // The ones that arrived out of order.
        static final ConcurrentSkipListSet<Widget>[] queued = Generics.<ConcurrentSkipListSet<Widget>>newArray(N);
    
        static class Generics {
    
            // A new Generics method for when we switch to Java 7.
            @SafeVarargs
            static <E> E[] newArray(int length, E... array) {
                return Arrays16.copyOf(array, length);
            }
        }
    
        static {
            // Populate the arrays.
            for (int i = 0; i < N; i++) {
                seen[i] = new AtomicInteger();
                queued[i] = new ConcurrentSkipListSet<>();
            }
        }
    
        // Thing that is produced and consumed.
        private static class Widget implements Comparable<Widget> {
    
            // Who produced it.
            public final int producer;
            // Its sequence number.
            public final int sequence;
    
            public Widget(int producer, int sequence) {
                this.producer = producer;
                this.sequence = sequence;
            }
    
            @Override
            public String toString() {
                return producer + "\t" + sequence;
            }
    
            @Override
            public int compareTo(Widget o) {
                // Sort on producer
                int diff = Integer.compare(producer, o.producer);
                if (diff == 0) {
                    // And then sequence
                    diff = Integer.compare(sequence, o.sequence);
                }
                return diff;
            }
        }
    
        // Produces Widgets and feeds them to the supplied DoubleBufferedList.
        private static class TestProducer implements Runnable {
    
            // The list to feed.
            final DoubleBufferedList<Widget> list;
            // My ID
            final int id;
            // The sequence we're at
            int sequence = 0;
            // Set this at true to stop me.
            public volatile boolean stop = false;
    
            public TestProducer(DoubleBufferedList<Widget> list, int id) {
                this.list = list;
                this.id = id;
            }
    
            @Override
            public void run() {
                // Just pump the list.
                while (!stop) {
                    list.add(new Widget(id, sequence++));
                }
            }
        }
    
        // Consumes Widgets from the suplied DoubleBufferedList
        private static class TestConsumer implements Runnable {
    
            // The list to bleed.
            final DoubleBufferedList<Widget> list;
            // My ID
            final int id;
            // Set this at true to stop me.
            public volatile boolean stop = false;
    
            public TestConsumer(DoubleBufferedList<Widget> list, int id) {
                this.list = list;
                this.id = id;
            }
    
            @Override
            public void run() {
                // The list I am working on.
                List<Widget> l = list.get();
                // Stop when stop == true && list is empty
                while (!(stop && l.isEmpty())) {
                    // Record all items in list as arrived.
                    arrived(l);
                    // Grab another list.
                    l = list.get();
                }
            }
    
            private void arrived(List<Widget> l) {
                for (Widget w : l) {
                    // Mark each one as arrived.
                    arrived(w);
                }
            }
    
            // A Widget has arrived.
            private static void arrived(Widget w) {
                // Which one is it?
                AtomicInteger n = seen[w.producer];
                // Don't allow multi-access to the same producer data or we'll end up confused.
                synchronized (n) {
                    // Is it the next to be seen?
                    if (n.compareAndSet(w.sequence, w.sequence + 1)) {
                        // It was the one we were waiting for! See if any of the ones in the queue can now be consumed.
                        for (Iterator<Widget> i = queued[w.producer].iterator(); i.hasNext();) {
                            Widget it = i.next();
                            // Is it in sequence?
                            if (n.compareAndSet(it.sequence, it.sequence + 1)) {
                                // Done with that one too now!
                                i.remove();
                            } else {
                                // Found a gap! Stop now.
                                break;
                            }
                        }
                    } else {
                        // Out of sequence - Queue it.
                        queued[w.producer].add(w);
                    }
                }
            }
        }
    
        // Main tester
        public static void main(String args[]) {
            try {
                System.out.println("DoubleBufferedList:Test");
                // Create my test buffer.
                DoubleBufferedList<Widget> list = new DoubleBufferedList<>();
                // All running threads - Producers then Consumers.
                List<Thread> running = new LinkedList<>();
                // Start some producer tests.
                List<TestProducer> producers = new ArrayList<>();
                for (int i = 0; i < N; i++) {
                    TestProducer producer = new TestProducer(list, i);
                    Thread t = new Thread(producer);
                    t.setName("Producer " + i);
                    t.start();
                    producers.add(producer);
                    running.add(t);
                }
    
                // Start the same number of consumers (could do less or more if we wanted to).
                List<TestConsumer> consumers = new ArrayList<>();
                for (int i = 0; i < N; i++) {
                    TestConsumer consumer = new TestConsumer(list, i);
                    Thread t = new Thread(consumer);
                    t.setName("Consumer " + i);
                    t.start();
                    consumers.add(consumer);
                    running.add(t);
                }
                // Wait for a while.
                Thread.sleep(5000);
                // Close down all.
                for (TestProducer p : producers) {
                    p.stop = true;
                }
                for (TestConsumer c : consumers) {
                    c.stop = true;
                }
                // Wait for all to stop.
                for (Thread t : running) {
                    System.out.println("Joining " + t.getName());
                    t.join();
                }
                // What results did we get?
                int totalMessages = 0;
                for (int i = 0; i < N; i++) {
                    // How far did the producer get?
                    int gotTo = producers.get(i).sequence;
                    // The consumer's state
                    int seenTo = seen[i].get();
                    totalMessages += seenTo;
                    Set<Widget> queue = queued[i];
                    if (seenTo == gotTo && queue.isEmpty()) {
                        System.out.println("Producer " + i + " ok.");
                    } else {
                        // Different set consumed as produced!
                        System.out.println("Producer " + i + " Failed: gotTo=" + gotTo + " seenTo=" + seenTo + " queued=" + queue);
                    }
                }
                System.out.println("Total messages " + totalMessages);
    
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-02-17
      • 2021-02-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-09-24
      相关资源
      最近更新 更多