【问题标题】:Best synchronization strategy to block worker threads when a database is down数据库关闭时阻塞工作线程的最佳同步策略
【发布时间】:2015-02-20 16:16:20
【问题描述】:

多个工作线程正在从一个队列进行处理,当数据库发生故障时,它将联系主管,然后锁定所有工作线程并定期轮询数据库,直到数据库启动,然后释放所有线程以便它们可以继续处理。工作线程可以提前或等待处理,主管线程可以锁定或解锁。

我在想这样的界面。你会使用什么同步原语?演员会是一个很好的解决方案,但我没有时间重写。

public interface Latch {

    /**
     * This method will cause a thread(s) to only advance if the latch is in an open state. If the
     * latch is closed the thread(s) will wait until the latch is open before they can advance.
     */
    void advanceWhenOpen();

    /**
     * Close the latch forcing all threads that reaches the latch's advance method to wait until
     * its open again.
     */
    void close();

    /**
     * Opens the latch allowing blocked threads to advance.
     */
    void open();

    boolean isOpen();
}

【问题讨论】:

  • 可能这个问题应该去 CodeReview。
  • 我还没有实现它,我正在寻找解决问题的不同策略。
  • @mkrakhin CodeReview 需要审查一个有效的实现,这只是一个接口的(草稿版本)。
  • 好吧。看来我对CR有一些误解。对不起。
  • 您使用的是哪种队列? BlockingQueue 会为你做这件事。

标签: java multithreading


【解决方案1】:

你想要的并不是真正的“锁存器”——至少《Java 并发实践》一书中说“一旦锁存器到达终端状态,它就不能再改变状态,所以它永远保持打开状态。”

p>

但是您可以在后台使用 CountDownLatch 对象 - 每当您的“Latch”需要关闭时,您可以创建一个计数为 1 的新 CountDownLatch 对象,并在您的 advanceWhenOpen() 中启用 await()。我认为从可读性的角度来看,这将是最好的解决方案。

【讨论】:

    【解决方案2】:

    为此,我将使用ReadWriteLock 作为同步原语。与简单的监视器或互斥锁相比,读/写锁的优点是多个线程可以在任何给定时间持有读锁。当您有很多读取器(例如,在这种情况下是您的线程池)和只有一个或几个写入器(例如,线程检查数据库的打开/关闭)时,这是有利的。

    使用单个监视器或互斥锁,您的线程将在一个锁上序列化,从而使该部分代码有争议。

    【讨论】:

      【解决方案3】:

      一种选择是代理队列以使其在数据库不可用时暂停。工作人员可以在处理时检查队列的暂停状态,并在必要时等待它取消暂停。一个基本的代码演示:

      import java.util.concurrent.CountDownLatch;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      import java.util.concurrent.LinkedBlockingQueue;
      import java.util.concurrent.atomic.AtomicReference;
      
      public class PausableQueue<T> {
      
          LinkedBlockingQueue<T> q = new LinkedBlockingQueue<T>();
          AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>(new CountDownLatch(0));
      
          public T take() throws InterruptedException {
      
              awaitPause();
              return q.take();
          }
      
          public void awaitPause() throws InterruptedException {
              pause.get().await();
          }
      
          public void setPaused(boolean paused) {
      
              if (paused) {
                  // only update if there are no threads waiting on current countdown-latch
                  if (!isPaused()) {
                      pause.set(new CountDownLatch(1));
                  }
              } else {
                  pause.get().countDown();
              }
          }
      
          public boolean isPaused() {
              return (pause.get().getCount() > 0L);
          }
      
          /* *** Test the pausable queue *** */
      
          public static void main(String[] args) {
      
              ExecutorService executor = Executors.newCachedThreadPool();
              try {
                  testPause(executor);
              } catch (Exception e) {
                  e.printStackTrace();
              }
              executor.shutdownNow();
          }
      
          private static void testPause(ExecutorService executor) throws Exception {
      
              final PausableQueue<Object> q = new PausableQueue<Object>();
              for (int i = 0; i < 3; i++) {
                  q.q.add(new Object());
              }
              final CountDownLatch tfinished = new CountDownLatch(1);
              Runnable taker = new Runnable() {
      
                  @Override
                  public void run() {
      
                      println("Taking an object.");
                      try {
                          Object o = q.take();
                          println("Got an object: " + o);
                      } catch (Exception e) {
                          e.printStackTrace();
                      } finally {
                          tfinished.countDown();
                      }
                  }
              };
      
              executor.execute(taker);
              tfinished.await();
              final CountDownLatch tstarted2 = new CountDownLatch(2);
              final CountDownLatch tfinished2 = new CountDownLatch(2);
              taker = new Runnable() {
      
                  @Override
                  public void run() {
      
                      println("Taking an object.");
                      tstarted2.countDown();
                      try {
                          Object o = q.take();
                          println("Got an object: " + o);
                      } catch (Exception e) {
                          e.printStackTrace();
                      } finally {
                          tfinished2.countDown();
                      }
                  }
              };
              q.setPaused(true);
              println("Queue paused");
              executor.execute(taker);
              executor.execute(taker);
              tstarted2.await();
              // Pause to show workers pause too
              Thread.sleep(100L);
              println("Queue unpausing");
              q.setPaused(false);
              tfinished2.await();
              // "Got an object" should show a delay of at least 100 ms.
          }
      
          private static void println(String s) {
              System.out.println(System.currentTimeMillis() + " - " + s);
          }
      
      }
      

      【讨论】:

        猜你喜欢
        • 2010-09-08
        • 2017-08-13
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-10-18
        • 2010-09-21
        • 1970-01-01
        相关资源
        最近更新 更多