【问题标题】:How to implement round-robin order for PriorityBlockingQueue?如何实现 PriorityBlockingQueue 的循环排序?
【发布时间】:2015-01-02 05:33:13
【问题描述】:

我有一个PriorityBlockingQueue。单个线程一次使用此队列中的一条消息并对其进行处理。其他几个线程正在将消息插入队列。生产者线程为他们提交的每条消息分配一个完整的优先级。静态AtomicLong 用于为每条消息分配一个唯一的、单调递增的ID。队列的Comparator先按此优先级对消息进行排序,然后同等优先级的消息按ID排序(ID最低的在前)。

问题:有时一个生产者会提交大量消息。然后,这会使其他生产者无法处理他们的消息。我想做的是在生产者之间进行消费者循环以获取相同优先级的消息(同时仍按提交顺序处理单个生产者的同等优先级消息)。但我不知道如何编写 Comparator 来做到这一点。

我考虑的另一种选择是为每个生产者设置一个单独的队列。但是,我认为这行不通,因为我不知道单线程在多个队列上等待的任何方式。

【问题讨论】:

    标签: java multithreading priority-queue java.util.concurrent round-robin


    【解决方案1】:

    我觉得为每个生产者使用一个Queue 来实现这一点更简单。一个线程不能等待多个Queues,但您可以将所有Queues 合并到一个辅助类中,这样就不需要了。

    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.LinkedHashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.NoSuchElementException;
    import java.util.Queue;
    import java.util.concurrent.PriorityBlockingQueue;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    import javax.annotation.concurrent.GuardedBy;
    
    public class RoundRobin<P, E> {
        private final Lock lock = new ReentrantLock();
        private final Condition added = lock.newCondition();
    
        @GuardedBy("lock") private final Map<P, Queue<E>> queues = new LinkedHashMap<>();
    
        public boolean add(P producer, E item) {
            lock.lock();
            try {
                if (!queues.containsKey(producer)) {
                    queues.put(producer, new PriorityBlockingQueue<>());
                }
    
                added.signalAll();
                return queues.get(producer).add(item);
            } finally {
                lock.unlock();
            }
        }
    
        public Iterator<E> roundRobinIterator() {
            return new Iterator<E>() {
                private Iterator<? extends Queue<E>> i = null;
                private boolean singlePass = true;
    
                @Override
                public boolean hasNext() {
                    return true;
                }
    
                @Override
                public E next() {
                    lock.lock();
                    try {
                        while (true) {
                            if (i == null || !i.hasNext()) {
                                i = queues.values().iterator();
                                singlePass = true;
                            }
    
                            while (i.hasNext()) {
                                Queue<E> q = i.next();
                                if (!q.isEmpty()) {
                                    if (singlePass) {
                                        // copy the iterator to prevent
                                        // ConcurrentModificationExceptions
                                        singlePass = false;
                                        i = copy(i);
                                    }
                                    return q.poll();
                                }
                            }
    
                            if (singlePass) {
                                // If singlePass is true then we just checked every
                                // queue and they were all empty.
                                // Wait for another element to be added.
                                added.await();
                            }
                        }
                    } catch (InterruptedException e) {
                        throw new NoSuchElementException(e.getMessage());
                    } finally {
                        lock.unlock();
                    }
                }
    
                private <T> Iterator<? extends T> copy(Iterator<? extends T> i) {
                    List<T> copy = new ArrayList<>();
                    while (i.hasNext()) {
                        copy.add(i.next());
                    }
                    return copy.iterator();
                }
            };
        }
    }
    

    【讨论】:

    • 我喜欢这个想法(特别是使用 ReenterantLock 和 Condition)。我会尝试类似的东西。
    • 由于外部有 ReentrantLock,因此内部不需要 PriorityBlockingQueue,PriorityQueue 就可以了——双重锁定会带来更差的性能和更大的死锁风险(不是您当前的代码,但将来可能会更改)。
    • 另外,当我需要更高的优先级来应用不同生产者的消息时,您的解决方案为每个生产者都有一个单独的优先级队列。你有 LinkedHashMap;我最终基本上做了 Map>。我接受你的回答,因为它让我在正确的轨道上思考,并且最接近我最终实际做的事情。
    【解决方案2】:

    我想我会这样做:

    import java.util.ArrayList;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Queue;
    
    public class RRQueue<M> {
        private final ThreadLocal<Queue<M>> threadQueue = new ThreadLocal<>();
        private final List<Queue<M>> queues;
        private int current = 0;
    
        public RRQueue() {
            this.queues = new ArrayList<>();
        }
    
        public synchronized void add(M msg) {
            Queue<M> queue = threadQueue.get();
            if (queue == null) {
                queue = new LinkedList<>(); // or whatever
                queues.add(queue);
                threadQueue.set(queue);
            }
            queue.add(msg);
            notify();
        }
    
        public synchronized M get() throws InterruptedException {
            while (true) {
                for (int i = 0; i < queues.size(); ++i) {
                    Queue<M> queue = queues.get(current);
                    current = (current+1)%queues.size();
                    if (!queue.isEmpty()) {
                        return queue.remove();
                    }
                }
                wait();
            }
        }
    }
    

    【讨论】:

    • 我认为这可行。但是,它没有解决优先级问题。
    【解决方案3】:

    这完全取决于您如何分配 ID。将它们分开分配 N,其中 N 是生产者的数量,并将每个生产者的索引添加到其中。然后按顺序读取它们将产生循环顺序。您需要做一点记账才能知道何时增加底层 ID,这将在您达到任何 xNx-1 时发生。

    【讨论】:

    • 有两个生产者,你是说给第一个生产者分配偶数ID,给第二个生产者分配奇数ID?假设生产者 #1 提交了 10 条消息,这些消息已完全处理 - 它使用前十个奇数 ID。一段时间后,生产者 #1 提交了 10 条消息,这些消息获得了接下来的 10 个奇数 ID。同时,生产者#2 提交了 10 条消息,这些消息消耗了前 10 个偶数 ID。这不会导致生产者 #2 的第一批在生产者 #1 的第二批之前处理吗?这不是未完成消息的循环。
    • 没有。根据我的最后一句话,您将增加基础 ID,因此在您的示例中,ID 将运行 0,2,4,6,8,10,12,14,16,18,19, ...
    • 所以你会有一个计数器,它以 N 递增(其中 N 是当前生产者计数),以及一个大小为 N 的布尔标志数组,最初都是假的。当生产者 x 请求一个 ID 时,如果 flags[x] 为 false,则设置 flags[x]=true;如果 flags[x] 为真,则将 counter 增加 N,将所有标志设置为 false,设置 flags[x]=true;那么,无论哪种情况,都返回 counter+x?我还假设一个同步块来保护计数器和数组。此外,要动态更改生产者计数,需要调整数组大小,将其所有值设置为 false,递增计数器。
    • 记账的细节我还没想好,也不一定只有这样,但我给的顺序就是你想要的,是吗?
    • 假设生产者 #1 提交了 100 条消息。到目前为止,已经处理了其中的 50 个。现在生产者#2 提交了 100 条消息。由于增加了底层 ID,生产者 #2 的所有消息的 ID 号是否都高于生产者 #1 的 ID 号,所以在生产者 #1 完成之前不会处理它们吗?这不是循环。
    【解决方案4】:

    可以使用 PriorityBlockingQueue 实现 N 个生产者之间的循环,如下所示。

    为每个生产者维护 N 个 AtomicInteger 计数器和一个全局计数器,以防生产者计数器相等。

    当添加到生产者的 Q incement 计数器以及全局计数器并存储到 Q 对象中时。 Q 的比较器将根据生产者计数器 1st 排序,然后是存储在 Q 对象中的全局计数器值。

    但是,当一个生产者的对象在 Q 中的某个时间为空时,相应的计数器就会落后,并且当对象开始进入时它将开始占用 Q。

    为避免这种情况,请维护一个 volatile 变量,该变量在出队时使用对象的生产者计数器进行更新。如果值小于 volatile 变量中最后一个出队的计数器,则在入队期间增加生产者计数器后,将计数器重置为该值 + 1。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-02-15
      • 1970-01-01
      • 1970-01-01
      • 2021-10-28
      相关资源
      最近更新 更多