【问题标题】:is there a 'block until condition becomes true' function in java?java中是否有“阻塞直到条件变为真”功能?
【发布时间】:2011-08-25 08:10:18
【问题描述】:

我正在为服务器编写一个侦听器线程,目前我正在使用:

while (true){
    try {
        if (condition){
            //do something
            condition=false;
        }
        sleep(1000);

    } catch (InterruptedException ex){
        Logger.getLogger(server.class.getName()).log(Level.SEVERE, null, ex);
    }
}

使用上面的代码,我遇到了 run 函数占用所有 cpu 时间循环的问题。 sleep 功能有效,但它似乎是临时修复,而不是解决方案。

是否有一些函数会阻塞直到变量“条件”变为“真”? 还是连续循环是等待变量值改变的标准方法?

【问题讨论】:

  • 为什么上面的代码会吃掉你所有的cpu,它似乎每秒只会启动一次。反正看到这个线程:stackoverflow.com/questions/289434/…
  • 有关此主题的完整介绍,请参阅 Java 并发实践 的第 14 章。但更一般地,您可能希望使用更高级别的实用程序,例如 BlockingQueueSemaphoreCountDownLatch,而不是使用低级别机制。

标签: java multithreading block


【解决方案1】:

也可以利用CompletableFutures(从 Java 8 开始):

final CompletableFuture<String> question = new CompletableFuture<>();

// from within the consumer thread:
final String answer = question.get(); // or: event.get(7500000, TimeUnit.YEARS)

// from within the producer thread:
question.complete("42");

【讨论】:

    【解决方案2】:

    与 EboMike 的回答类似,您可以使用类似于 wait/notify/notifyAll 的机制,但准备好使用 Lock

    例如,

    public void doSomething() throws InterruptedException {
        lock.lock();
        try {
            condition.await(); // releases lock and waits until doSomethingElse is called
        } finally {
            lock.unlock();
        }
    }
    
    public void doSomethingElse() {
        lock.lock();
        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }
    

    您将等待另一个线程通知的某些条件(在这种情况下调用doSomethingElse),此时,第一个线程将继续...

    在内部同步上使用Locks 有很多优点,但我更喜欢使用明确的Condition 对象来表示条件(您可以拥有多个对象,这对于生产者-消费者之类的东西来说是一个很好的接触)。

    另外,我不禁注意到您如何处理示例中的中断异常。您可能不应该使用这样的异常,而是使用Thread.currentThread().interrupt 重置中断状态标志。

    这是因为如果抛出异常,中断状态标志将被重置(它的意思是“我不再记得被打断了,如果他们问,我将无法告诉其他人我被打断了") 和另一个进程可能依赖于这个问题。示例是其他东西已经基于此实现了中断策略……唷。另一个例子可能是您的中断策略,而不是 while(true) 可能已实现为 while(!Thread.currentThread().isInterrupted()(这也将使您的代码更加...社会体贴)。

    因此,总而言之,当您想使用Lock 时,使用Condition 大致相当于使用wait/notify/notifyAll,记录是邪恶的,而吞下InterruptedException 是淘气的;)

    【讨论】:

    • 使用Condition + Lock等同于Object同步方法+synchronized。前者在等待条件之前允许通知——另一方面,如果你在Object.wait() 之前调用Object.notify(),线程将永远阻塞。此外,await() 必须在循环中调用,请参阅文档。
    • @TheOperator 关于“前者允许在等待条件之前发出通知” - 我查看了 Condition 的 Javadoc,但找不到支持此声明的文本。你能解释一下你的说法吗?
    • 示例代码错误,需要循环调用await。请参阅 API 文档了解 Condition。
    • @TheOperator Object.wait() 也必须在循环中调用。
    【解决方案3】:

    EboMike's answerToby's answer 都在正确的轨道上,但它们都包含一个致命的缺陷。该缺陷称为丢失通知

    问题是,如果一个线程调用foo.notify(),它根本不会做任何事情,除非其他线程已经在foo.wait() 调用中休眠。对象foo 不记得它被通知了。

    除非线程在 foo 上同步,否则不允许调用 foo.wait()foo.notify() 是有原因的。这是因为避免丢失通知的唯一方法是使用互斥锁保护条件。正确完成后,它看起来像这样:

    消费者线程:

    try {
        synchronized(foo) {
            while(! conditionIsTrue()) {
                foo.wait();
            }
            doSomethingThatRequiresConditionToBeTrue();
        }
    } catch (InterruptedException e) {
        handleInterruption();
    }
    

    生产者线程:

    synchronized(foo) {
        doSomethingThatMakesConditionTrue();
        foo.notify();
    }
    

    改变条件的代码和检查条件的代码都在同一个对象上同步,消费者线程在等待之前显式地测试条件。当条件已经为真时,消费者无法错过通知并最终永远卡在wait() 调用中。

    还要注意wait() 处于循环中。这是因为,在一般情况下,当消费者重新获得foo 锁并唤醒时,其他线程可能再次使条件为假。即使这在 您的 程序中是不可能的,但在某些操作系统中,即使没有调用 foo.notify()foo.wait() 也可能返回。这称为虚假唤醒,它允许发生,因为它使等待/通知更容易在某些操作系统上实现。

    【讨论】:

    • 我们应该将 try-catch 放在 while 循环内部还是外部?推荐哪种方式,为什么?
    • @JaydevKalivarapu,假设你问的是InterruptedException,对吧?中断的含义由您决定,但在大多数情况下,它可能意味着“停止等待”并执行其他操作(例如,关闭整个程序。)所以在大多数情况下在这种情况下,您会希望它看起来像我上面的示例,中断处理程序位于循环之外。
    • @JaydevKalivarapu, P.S.:当我写上述答案时,我不知道该模式有一个名称:Oracle Java 教程将其称为 受保护块。您可以在docs.oracle.com/javase/tutorial/essential/concurrency/… 阅读有关它的信息
    • @JianGuo, foo.wait() 如果foo 没有被锁定,将会抛出IllegalMonitorStateException。这是为了提醒您,在不持有锁的代码中 wait() 是没有意义的。上面我的回答触及了原因,但如果你想要一个彻底的解释,那么你应该阅读教程。 docs.oracle.com/javase/tutorial/essential/concurrency/…
    • @JianGuo,如果你这样做,那么可能发生的是; (1)消费者测试条件,发现它是假的,(2)消费者尝试进入synchronized(foo),但由于生产者已经在foo上同步而被阻塞,(3)生产者导致条件变为真,调用foo.notify(),然后释放锁,(4)消费者进入synchronized(foo)块,调用foo.wait()。现在消费者被困在等待永远不会到达的通知。这个问题有时被称为“丢失通知”。
    【解决方案4】:

    您可以使用semaphore

    当条件不满足时,另一个线程获取信号量。
    您的线程会尝试使用 acquireUninterruptibly()
    获取它 或tryAcquire(int permits, long timeout, TimeUnit unit) 会被屏蔽。

    当条件满足时,信号量也被释放,你的线程会获取它。

    您也可以尝试使用SynchronousQueueCountDownLatch

    【讨论】:

      【解决方案5】:

      由于没有人发布使用 CountDownLatch 的解决方案。怎么样:

      public class Lockeable {
          private final CountDownLatch countDownLatch = new CountDownLatch(1);
      
          public void doAfterEvent(){
              countDownLatch.await();
              doSomething();
          }
      
          public void reportDetonatingEvent(){
              countDownLatch.countDown();
          }
      }
      

      【讨论】:

      • CountDownLatch 的缺点是不可重用:一旦计数变为 0 就不再可用
      【解决方案6】:

      这样的轮询绝对是最不受欢迎的解决方案。

      我假设你有另一个线程会做一些事情来使条件为真。有几种方法可以同步线程。在您的情况下,最简单的方法是通过对象发出通知:

      主线程:

      synchronized(syncObject) {
          try {
              // Calling wait() will block this thread until another thread
              // calls notify() on the object.
              syncObject.wait();
          } catch (InterruptedException e) {
              // Happens if someone interrupts your thread.
          }
      }
      

      其他话题:

      // Do something
      // If the condition is true, do the following:
      synchronized(syncObject) {
          syncObject.notify();
      }
      

      syncObject 本身可以是一个简单的Object

      还有许多其他的线程间通信方式,但使用哪一种取决于您正在做什么。

      【讨论】:

      • 不客气!请记住,还有其他同步方式,例如信号量、阻塞队列等……这完全取决于您想要做什么。对象是很棒的通用线程同步工具。祝你的应用好运!
      • try catch 应该包装在一个循环中,测试真正的底层条件以防止虚假唤醒(请参阅等待文档)。
      • 值得注意的是,如果先调用 notifyAll,wait() 将永远等待,即使在开始等待之前条件已经满足。
      • 这个答案已经过时了,因为 java.concurent 已经出来了。更干净、更不容易出错的等待方法是使用 Effective Java Ed 2 中的 CountDownLatch
      • @PeterLawrey 还应该注意(甚至在给出这个答案八年后)如果另一个线程开始以这种方式等待,使用 notfify 而不是 notifyAll 可能会导致有趣的效果好吧,因为notify 只通知了其中一个等待线程(假设它是随机的)。
      【解决方案7】:

      无锁解决方案(?)

      我遇到了同样的问题,但我想要一个不使用锁的解决方案。

      问题:我最多有一个线程从队列中消耗。多个生产者线程不断地插入队列,如果消费者在等待,则需要通知消费者。队列是无锁的,因此使用锁进行通知会导致生产者线程中不必要的阻塞。每个生产者线程都需要获取锁才能通知等待的消费者。我相信我想出了一个使用LockSupportAtomicReferenceFieldUpdater 的无锁解决方案。如果 JDK 中存在无锁屏障,我找不到它。 CyclicBarrierCoundDownLatch 在内部都使用我能找到的锁。

      这是我稍微缩写的代码。为了清楚起见,这段代码一次只允许 一个 线程等待。通过使用某种类型的原子集合来存储多个所有者,可以对其进行修改以允许多个等待者/消费者(ConcurrentMap 可能有效)。

      我已经使用了这段代码,它似乎可以工作。我没有对它进行广泛的测试。我建议您在使用前阅读LockSupport 的文档。

      /* I release this code into the public domain.
       * http://unlicense.org/UNLICENSE
       */
      
      import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
      import java.util.concurrent.locks.LockSupport;
      
      /**
       * A simple barrier for awaiting a signal.
       * Only one thread at a time may await the signal.
       */
      public class SignalBarrier {
          /**
           * The Thread that is currently awaiting the signal.
           * !!! Don't call this directly !!!
           */
          @SuppressWarnings("unused")
          private volatile Thread _owner;
      
          /** Used to update the owner atomically */
          private static final AtomicReferenceFieldUpdater<SignalBarrier, Thread> ownerAccess =
              AtomicReferenceFieldUpdater.newUpdater(SignalBarrier.class, Thread.class, "_owner");
      
          /** Create a new SignalBarrier without an owner. */
          public SignalBarrier() {
              _owner = null;
          }
      
          /**
           * Signal the owner that the barrier is ready.
           * This has no effect if the SignalBarrer is unowned.
           */
          public void signal() {
              // Remove the current owner of this barrier.
              Thread t = ownerAccess.getAndSet(this, null);
      
              // If the owner wasn't null, unpark it.
              if (t != null) {
                  LockSupport.unpark(t);
              }
          }
      
          /**
           * Claim the SignalBarrier and block until signaled.
           *
           * @throws IllegalStateException If the SignalBarrier already has an owner.
           * @throws InterruptedException If the thread is interrupted while waiting.
           */
          public void await() throws InterruptedException {
              // Get the thread that would like to await the signal.
              Thread t = Thread.currentThread();
      
              // If a thread is attempting to await, the current owner should be null.
              if (!ownerAccess.compareAndSet(this, null, t)) {
                  throw new IllegalStateException("A second thread tried to acquire a signal barrier that is already owned.");
              }
      
              // The current thread has taken ownership of this barrier.
              // Park the current thread until the signal. Record this
              // signal barrier as the 'blocker'.
              LockSupport.park(this);
              // If a thread has called #signal() the owner should already be null.
              // However the documentation for LockSupport.unpark makes it clear that
              // threads can wake up for absolutely no reason. Do a compare and set
              // to make sure we don't wipe out a new owner, keeping in mind that only
              // thread should be awaiting at any given moment!
              ownerAccess.compareAndSet(this, t, null);
      
              // Check to see if we've been unparked because of a thread interrupt.
              if (t.isInterrupted())
                  throw new InterruptedException();
          }
      
          /**
           * Claim the SignalBarrier and block until signaled or the timeout expires.
           *
           * @throws IllegalStateException If the SignalBarrier already has an owner.
           * @throws InterruptedException If the thread is interrupted while waiting.
           *
           * @param timeout The timeout duration in nanoseconds.
           * @return The timeout minus the number of nanoseconds that passed while waiting.
           */
          public long awaitNanos(long timeout) throws InterruptedException {
              if (timeout <= 0)
                  return 0;
              // Get the thread that would like to await the signal.
              Thread t = Thread.currentThread();
      
              // If a thread is attempting to await, the current owner should be null.
              if (!ownerAccess.compareAndSet(this, null, t)) {
                  throw new IllegalStateException("A second thread tried to acquire a signal barrier is already owned.");
              }
      
              // The current thread owns this barrier.
              // Park the current thread until the signal. Record this
              // signal barrier as the 'blocker'.
              // Time the park.
              long start = System.nanoTime();
              LockSupport.parkNanos(this, timeout);
              ownerAccess.compareAndSet(this, t, null);
              long stop = System.nanoTime();
      
              // Check to see if we've been unparked because of a thread interrupt.
              if (t.isInterrupted())
                  throw new InterruptedException();
      
              // Return the number of nanoseconds left in the timeout after what we
              // just waited.
              return Math.max(timeout - stop + start, 0L);
          }
      }
      

      为了给出一个模糊的用法示例,我将采用 james large 的示例:

      SignalBarrier barrier = new SignalBarrier();
      

      消费线程(单数,不是复数!):

      try {
          while(!conditionIsTrue()) {
              barrier.await();
          }
          doSomethingThatRequiresConditionToBeTrue();
      } catch (InterruptedException e) {
          handleInterruption();
      }
      

      生产者线程:

      doSomethingThatMakesConditionTrue();
      barrier.signal();
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-10-02
        相关资源
        最近更新 更多