【问题标题】:Custom blocking queue locking issue自定义阻塞队列锁定问题
【发布时间】:2013-05-19 17:37:50
【问题描述】:

我正在尝试使用固定长度的字节数组来自定义阻塞队列的实现。我没有删除轮询元素,因此我调整了 put 方法以返回字节数组,以便可以直接写入(生产者线程使用 MappedByteBuffer 直接写入此字节数组)。我添加了“commitPut()”方法来简单地增加计数器并设置“长度”数组。 (如果有多个线程在写,这可能是并发问题,但我知道只有一个线程在写)。

以下是我目前拥有的。如果我逐步调试它会起作用,但是如果我“运行”它看起来会遇到一些锁定问题。我复制、剥离和调整了 ArrayBlockingQueue 代码。有更好知识的人可以看看课程并告诉我我做错了什么,或者如何做得更好(比如直接写入缓冲区并同时设置长度数组和计数器)?

public class ByteArrayBlockingQueue {

    private final int[] lens; // array to valid lengths
    private final byte[][] items; // array of byte arrays

    private int takeIndex = 0;
    private int putIndex = 0;
    private int count = 0;

    public volatile int polledLen = 0; // lenght of last polled byte array

    private final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;

    final int inc(int i) {
        return (++i == items.length)? 0 : i;
    }

    public ByteArrayBlockingQueue(int capacity, int size, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new byte[capacity][size];
        this.lens = new int[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull  = lock.newCondition();
    }

    public byte[] put() throws InterruptedException {
        final byte[][] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == items.length)
                    notFull.await();

            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            //insert(e, len);
            return items[putIndex];
        } finally {
            lock.unlock();
        }
    }

    public void commitPut(int lenBuf) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            lens[putIndex] = lenBuf;
            putIndex = inc(putIndex);
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public byte[] poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == 0)
                return null;
            final byte[][] items = this.items;
            final int[] lens = this.lens;
            byte[] e = items[takeIndex];
            this.polledLen = lens[takeIndex];
            //items[takeIndex] = null;
            takeIndex = inc(takeIndex);
            --count;
            notFull.signal();
            return e;

        } finally {
            lock.unlock();
        }
    }
}

【问题讨论】:

  • 您能描述一下您遇到的“锁定问题”吗?
  • 似乎“生产者线程”锁定了“消费者线程”。我不太了解这些锁,尽管它们对我来说似乎很好(与其他内置阻塞队列的原理几乎相同)。如果我使用调试器逐步完成,在“poll”和“put”中设置断点 - 它可以工作。我可以看到位图(字节数组中的字节)正确解码。一旦我“运行通过”它就会失败(无法解码字节) - 如果我尝试调试轮询值,表达式观察器会失败并出现一些“子更新”错误,所以我认为这是一个锁定问题。

标签: java android locking queue blocking


【解决方案1】:

如果队列环绕,字节数组可能会在被消费者读取之前被重用和覆盖。简而言之,您需要有一个 commitGet 方法来确保生产者在用新数据覆盖数组之前等待消费者。

但是,我的建议是您依靠 java.util.concurrent.BlockingQueue 拥有第二个队列将它们从消费者返回到生产者,并依靠 java.nio.ByteByffer 来跟踪长度。生产者会这样做:

ByteBuffer buffer = bufferQueue.poll(); // instead of your put()
buffer.put(source);                     // fill buffer from source MappedByteBuffer
buffer.flip();                          // set length to the amount written
dataQueue.offer(buffer);                // instead of commitPut()

消费者会:

ByteBuffer buffer = dataQueue.poll();   // instead of your get()
buffer.get(...);                        // use data
buffer.clear();                         // reset length               
bufferQueue.offer(buffer);              // this is the missing commitGet()

您最初应该在freeQueue 中插入capacity 元素。但是请注意,这仍然会将数据从 source 缓冲区复制一次到队列中的临时缓冲区中,就像您的原始代码已经做的那样。

如果您真的不想复制数据(并确保在所有消费者都读取数据之前源不会更改!),您更好的选择是使用单个阻塞队列并插入使用ByteBuffer.slice() 获得的缓冲区从源缓冲区中获取要传递给消费者的每个数据块。然后这些将被垃圾收集,但应该比字节数组本身占用更少的内存。

【讨论】:

  • 感谢您的帮助。为什么我要尝试重用字节数组 - 在我使用简单的 ArrayBlockingQeueu 和位图之前。我有正在填充 qeueu 和消费者(抽屉)消费位图的生产者。至少我认为这是有效的,直到我在一些“有趣”的低预算安卓平板电脑上尝试它,GC 一直在为最小的分配启动(总共我使用了 48 个允许的 10MB,所以甚至没有关闭了解为什么 GC 在这款平板电脑上如此激进)。出于这个原因,我决定尝试使用固定分配的字节数组,重用它而不给 GC 原因启动。
  • 我明白了。使用第二个队列返回空数组应该是安全的。
  • 我仍然试图围绕你的两个队列概念来解决我的问题......你能再解释一下吗?如果我有生产者“阅读器”想要从 MappedByteBuffer 直接读取字节到“阅读器”队列(以避免字节复制)并在队列已满时等待,而消费者“抽屉”则从阅读器队列中消耗字节。因为字节数组是固定长度的(以容纳最大的位图),我仍然需要跟踪每个队列槽的“字节长度”。
  • 我已将回复更新为使用ByteBuffer。请注意,您的解决方案不会避免复制,因为它会重用字节数组。带有slice() 的单个队列将完全避免复制。
  • 我在原始代码中发现了问题 - 在 poll() notFull.signal() 发生在“返回”之前,生产者在消费者处理“轮询”之前开始填充新字节。这导致消费者处理第 11 个“字节帧”而不是第 1 个,但长度为第 1 个。目前,我将尝试修复该代码,因为我对此感到满意。但我肯定会试一试您的解决方案,并尝试将我的头绕在它周围,同时检查 slice()。这个领域对我来说有点新,需要一些时间来征服。感谢您的时间和帮助!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-10-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多