【问题标题】:Implementation of multi-consumer lock-free queue多消费者无锁队列的实现
【发布时间】:2019-04-30 21:04:51
【问题描述】:

最近我遇到了这个问题:据说有3个消费者线程,需要实现一个无锁队列(不能使用同步),这样就没有消费了线程被阻塞。假设队列已经包含数据。

我想了一会儿,发现原子操作,如果仔细使用会有所帮助。我的实现如下图所示。由于队列中已经存在数据,因此我没有实现 enqueue 方法并在构造函数中填充数组。

public class SPMCQueue {

    private AtomicInteger index = new AtomicInteger(0);

    public int[] arr;

    public SPMCQueue(int size) {
        arr = IntStream.range(0, size).toArray();
    }

    public Integer dequeue() {
        Integer ret = null;
        int x = index.getAndIncrement();
        if (x < arr.length) {
            ret = arr[x];
            System.out.println(arr[x] + " by " + Thread.currentThread().getName());
        }
        else {
            throw new RuntimeException("Queue is empty");
        }
        return ret;
    }
}

class QueueTest {
    public static void main(String[] args) {

        SPMCQueueq = new SPMCQueue(40);

        Runnable t1 = () -> {
            try {
            while (true) {
                q.dequeue();
            }
            }catch(Exception e) {

            }
        };

        Runnable t2 = () -> { 
            try {
            while(true) { q.dequeue(); }
            }catch(Exception e) {

            }
        };

        Runnable r3 = () -> { 

            try {
                while(true) { q.dequeue(); }
            } catch (Exception e) {
                // TODO Auto-generated catch block
                //e.printStackTrace();
            }

        };

        Thread thread1 = new Thread(t1);
        Thread thread2 = new Thread(t2);
        Thread thread3 = new Thread(r3);

        thread1.start();
        thread2.start();
        thread3.start();

    }
}

我已经执行了上面的程序,结果显示所有 3 个消费者都在使用数据,尽管它们是无序的,并且一些线程比其他线程消耗更多的数据,但我没有看到任何数据多次出现在 o/p 中。

我有以下问题:

  1. 上面的实现有问题吗?

  2. 还有哪些其他方法可以实现无锁消费者队列?

【问题讨论】:

  • “实现无锁消费者队列的其他方法有哪些?”java.util.concurrent.ConcurrentLinkedQueue
  • “没有消费线程被阻塞”——如果队列变空,你希望消费线程做什么?
  • @AlexeiKaigorodov 假设队列有无限数据。此外,问题更多是关于如何在不使用同步的情况下以无锁方式使用它。
  • @YugSingh 我的意思是,当生产者速度慢而消费者速度快时,队列就会变空。如果不阻止,您希望消费者在这种情况下应该做什么?睡觉和投票?还是在生产下一个项目之前浪费 CPU 周期?

标签: java multithreading queue lock-free


【解决方案1】:

我想连同答案一起给出我的回复:https://stackoverflow.com/a/21101904/7340499,因为它与您的问题相似。

那么,你的问题:

但我没有看到任何数据在 o/p 中多次出现。

这是意料之中的。因为,getAndIncrement() 是原子的,并且无论何时访问该函数,您都会得到不同的 x 值,从而得到不同的输出。 然而,由于在单个非原子出队操作中结合了“getAndIncrement()”和“System.out.print()”函数,您的输出有时可能是无序的,例如你在一个线程上得到 x=1,另一个线程中断,得到 x=2 并打印它,然后你的初始线程完成打印。我相信这也指出了您的实施问题,如(1)中所要求的。您的应用程序是否可以处理乱序的队列?

还有哪些其他实现无锁消费者队列的方法?

好吧,正如您所注意到的,原子操作是一种方式。但本质上,无论如何,它们非常像锁。它们只是在较小的规模上运行(尽管存在一些实现差异)。所以很难将它们概念化为不同的方法,至少对我自己而言。除此之外,我相信这里还有一些不错的资源:@​​987654322@

【讨论】:

  • this 用例中,它与锁并没有太大的不同,但是在读取端不需要任何原子 RMW 的情况下,您可以完美地扩展任何读者数量,读者之间不存在争用。 (例如,一个简单的 SeqLock,甚至是带有 RCU 的无等待阅读器)。即使在这种情况下,读者和作者之间基本上也不会发生争用。但是,多个阅读器必须相互竞争,才能确定哪个阅读器获得哪个队列槽。
  • @PeterCordes 对,所以如果读取不是原子的,那么您可以在 RMW 操作中拥有与您所说的一样多的读取器。然而,我所说的 atomic ~ locks 相似性的意思是,为了使操作成为原子的,CPU/OS/Runtime Env。 (或任何您的环境)必须确保该过程像单周期指令一样完成,这与 x86 和/或 ARM 中的许多汇编指令非常相似。现在,如果进程不是“真正的”原子进程,据我所知,剩下的唯一选择是通过使用某种锁以某种方式限制任何其他干扰当前​​进程的进程。
  • @PeterCordes 所以说,我不会使用锁,我会使用原子操作,就像说我会使用锁,但我称它们为原子操作。如果我错了,或者如果有其他方法生成无锁 RMW 或 RCW 操作,请纠正我,而无需任何类型的锁定/同步机制。我想知道是否有,因为我不再关注 x86 架构了,而且那里有一些非常酷的指令,支持很多很酷的东西,所以我可能不知道。
  • 读取或写入单个寄存器宽度变量通常原子的(并且无锁),无需操作系统的任何参与。例如Why is integer assignment on a naturally aligned variable atomic on x86?。但对于对齐的单词,ARM ldr / str 或 PowerPC lw/sw 或任何其他 ISA 的等效项基本上相同。对于原子 RMW,x86 有 lock add [mem], eax 或其他 (Can num++ be atomic for 'int num'?),而大多数其他 ISA 有某种形式的 LL/SC 重试循环。
  • 原子 RMW 是无锁的,因为线程不可能在持有锁的同时休眠并阻止所有其他试图访问共享变量的线程。这是一个根本的差异。 preshing.com/20130605/the-worlds-simplest-lock-free-hash-table 显示了一个使用无锁原子操作的固定大小哈希表(添加但不删除)的示例,因此写入可以与读取器甚至其他写入器并行发生。
猜你喜欢
  • 2012-01-12
  • 2011-02-11
  • 1970-01-01
  • 2014-10-31
  • 1970-01-01
  • 2011-08-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多