【问题标题】:Lock-free variant of wait/notify等待/通知的无锁变体
【发布时间】:2015-08-27 10:22:15
【问题描述】:

在调用o.wait()o.notify() 之前,Java 需要一个线程拥有o 的监视器。这是众所周知的事实。但是,任何此类机制都需要互斥锁才能正常工作吗?如果有提供的 API 会怎样

compareAndWait

setAndNotify

相反,将 CAS 操作与线程调度/取消调度相结合?这会有一些好处:

  • 即将进入等待状态的线程不会阻碍通知线程的进度;

  • 他们也不必在被允许检查等待条件之前互相等待;

  • 在通知端,任意数量的生产者线程可以同时进行。

提供这样的 API 是否存在根本的、不可克服的障碍?

【问题讨论】:

  • Lock 是在不使用等待/通知的情况下编写的。它在引擎盖下使用 CAS。
  • 但是我们仍然需要获取 Lock 才能使用它的 Condition。
  • 您能稍微扩展一下compareAndWaitsetAndNotify 的作用吗? compareAndWait 听起来像 Semaphore,而 setAndNotify 听起来像 BlockingQueue.put()
  • @OldCurmudgeon 与信号量的类比是一个很好的类比。所以结论可能是这样的功能确实存在,wait/notify 特别适用于您实际需要锁定的情况。
  • “等待/通知的无锁变体”是什么意思?无锁算法是一种保证在任何时候都至少有一个参与线程能够取得进展的算法。但是,当算法的目的是让线程等待某事时,“取得进展”是什么意思?

标签: java multithreading


【解决方案1】:

使用LockSupport.park()LockSupport.unpark(Thread) 实现任意等待/通知机制没有问题,因为这些基本原语不需要持有任何锁。

Object.wait/Object.notifyCondition.await/Condition.signal 在没有锁定的情况下向您提供这样的通知的原因是语义。通知的概念是一个线程等待一个条件被满足,而另一个线程在条件变为满足状态时停止等待。如果不持有与该条件关联的锁,则无法保证条件在测试条件状态和线程状态更改之间不会发生变化。

更具体地说,当一个更改条件的线程通知另一个线程时,可能会在通知发生之前再次修改条件。但更糟糕的是,条件可能会在线程开始到 wait 之前更改为“已完成”,在这种情况下,线程可能会错过通知并永远挂起。

即使你能够将条件测试和等待操作融合为一个原子操作,也无济于事。等待一个条件本身并不是目的。线程想要等待条件的原因是它想要执行以条件为先决条件的动作,因此在执行动作时不能改变。这就是重点:条件测试和操作必须作为一个持有锁的操作来实现,而不管锁的概念是如何实现的。

在某些特殊情况下不会出现此类问题,例如当已知条件的状态转换是有限的时,您可以排除条件可以返回到未完成的状态。这正是CountDownLatchCyclicBarrierPhaser 等工具的用途,但具有预定义的等待/通知语义的通知机制意味着不假设这种特殊情况。

【讨论】:

  • 这与我的想法基本一致。但是为什么wait/notify坚持锁上呢?为什么不让实施者决定他们是否需要它?我认为这与机制的实现方式有关。
  • 此外,唤醒后的动作与唤醒一起原子发生并不是必需的:在弱化之后,我们可以尝试获取工作项,如果那里没有任何事情发生,则返回睡眠。实际上,必须使用锁定方法来实现相同的事情。但是协调等待集与实际进入休眠的线程集可能会有困难。
  • 我不认为这是依赖于实现的。即使坚持持有锁,仍有很多事情可能做错,并且很难想象不持有锁不是错误的用例。尤其是程序员问“为什么我必须拥有锁”或只是对IllegalMonitorStateException 感到惊讶的所有这些情况,这些都是程序员没有意识到持有锁是他们所要求的语义的例子。这样做,否则他们不会因为不持有锁而绊倒。我怀疑是否有任何反例。
  • 是的,在检查条件和调用 wait 之间会有竞争。我认为这是必不可少的,但不是唤醒后的代码。这就是为什么我提出compareAndWait 来解决那个特定的比赛。但是compareAndWait 会有维护其内部等待集的问题,因此它可能归结为实现层面的根本障碍。
  • “唤醒后我们可以尝试获取工作项”——这意味着获取工作项的尝试在任何方面都是线程安全的。而且我无法想象有什么理由让该获取使用与物品可用性条件检查不同的锁。当然,如果您事后重新检查,您可以在两者之间释放该锁定,但这样做有什么好处?这会将更多必要的检查与更多的锁获取操作结合起来。
【解决方案2】:

比一些真正的工作代码更像是一个思想实验,但这似乎有效。

// My lock class.
public static class Padlock<E extends Enum<E>> {

    // Using Markable because I think I'm going to need it.
    public final AtomicReference<E> value;
    // Perhaps use a set to maintain all waiters.
    Set<Thread> waiters = ConcurrentHashMap.newKeySet();

    public Padlock(E initialValue) {
        this.value = new AtomicReference<>(initialValue);
    }

    /**
     * Waits for the locks value to become the specified key value.
     *
     * @param waitFor - The desired key.
     */
    public void compareAndWait(E waitFor) {
        log("Wait for " + waitFor);
        // Spin on the value.
        while (value.get() != waitFor) {
            log("Park waiting for " + waitFor);
            // Remember me as waiting.
            waiters.add(Thread.currentThread());
            // TODO: What do we do here??
            LockSupport.park();
            log("Awoke " + waitFor);
        }
    }

    /**
     * Sets the locks value to the key value.
     *
     * If this resulted in a change - notify all changers.
     *
     * @param shouldBe - What it should be now.
     * @param makeIt - The new value to set.
     */
    public void setAndNotify(E shouldBe, E makeIt) {
        log("Set " + shouldBe + "->" + makeIt);
        if (value.compareAndSet(shouldBe, makeIt)) {
            log("Notify " + shouldBe + "->" + makeIt);
            // It changed! Notify the waiters.
            for (Thread t : waiters) {
                // Perhaps
                log("Unpark " + t.getName());
                LockSupport.unpark(t);
            }
        }
    }
}

enum State {

    Off, On;
}

private static final long TESTTIME = 30000;
private static final long TICK = 100;

private static final void log(String s) {
    System.out.println(Thread.currentThread().getName() + ": " + s);

}

static class MutexTester implements Runnable {

    final Padlock<State> lock;

    public MutexTester(Padlock<State> lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        Thread.currentThread().setName(this.getClass().getSimpleName());
        long wait = System.currentTimeMillis() + TESTTIME;
        do {
            // Wait for an On!
            lock.compareAndWait(Test.State.On);
            try {
                log("Got it!");
                try {
                    Thread.sleep(TICK);
                } catch (InterruptedException ex) {
                    log("Interrupted!");
                }
            } finally {
                // Release
                lock.setAndNotify(Test.State.On, Test.State.Off);
            }
        } while (System.currentTimeMillis() < wait);
        log("Done");
    }
}

static class RandomSwitcher implements Runnable {

    final Padlock<State> lock;
    final Random random = new Random();

    public RandomSwitcher(Padlock<State> lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        Thread.currentThread().setName(this.getClass().getSimpleName());
        long wait = System.currentTimeMillis() + TESTTIME;
        do {
            // On!
            lock.setAndNotify(Test.State.Off, Test.State.On);
            log("On!");
            pause();
            lock.setAndNotify(Test.State.On, Test.State.Off);
            log("Off!");
            pause();
        } while (System.currentTimeMillis() < wait);
        log("Done");
    }

    private void pause() {
        try {
            // Random wait.
            Thread.sleep(TICK * random.nextInt(10));
        } catch (InterruptedException ex) {
            System.out.println("Interrupted! " + Thread.currentThread().getName());
        }
    }
}

public void test() throws InterruptedException {
    final Padlock<State> lock = new Padlock<>(State.Off);
    Thread t1 = new Thread(new MutexTester(lock));
    t1.start();
    Thread t2 = new Thread(new RandomSwitcher(lock));
    t2.start();
    t1.join();
    t2.join();
}

我已经实现了compareAndWait 等待独占使用而setAndNotify 释放互斥锁的协议。

【讨论】:

  • 干得好 :) 我认为进入等待状态然后被唤醒的首选原语是park/unpark。这些也解决了“错过通知”问题,因为它们的语义涉及粘性许可。但我认为你在评论中提到Semaphore 已经死了。 setAndNotifyrelease 完全一样:信号量的许可不属于线程。
  • 对于并发集,您可以使用newSetFromMap(new ConcurrentHashMap() 或者更好的是ConcurrentHashMap.newKeySet()
  • @MarkoTopolnik - 看起来LockSupport 可能有一些有用的机制。
  • 是的,我指的是park,unpark
  • @MarkoTopolnik - 我知道有理由使用AtomicMarkableReference!!现在没有时间修复它 - 抱歉。希望我给了你一个好的开始。
【解决方案3】:

首先,Java 的内置监视器(synchronizedwait)比许多人想象的要高效。请参阅biased locking,并计划进一步改进以利用硬件事务内存。

其次,您正在寻找的机制与synchronized/wait 提供的机制有不同的用途。后者保护一些受保护的资源,并且必须包含一个锁,因为它假定在wait 之后您希望进入临界区。您正在寻找的内容由其他 Java 并发原语提供,例如 CountDownLatchPhaserSemaphore

【讨论】:

  • 偏向锁定仅适用于非竞争锁定,因此与此处无关。而且我不指望特定实现的任何开销,而只是互斥的基本价格。
  • 但我同意第二点。我认为Semaphore 是最好的匹配。
  • 偏向的“锁”只是 HotSpot 和类似 JVM 中整个锁膨胀机制的一部分。查看更多here
  • 谢谢,但您似乎认为我的无知比实际存在的要多。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-03-27
相关资源
最近更新 更多