【问题标题】:Creating a Happens Before Relationship with AtomicBoolean使用 AtomicBoolean 创建发生在关系之前
【发布时间】:2017-08-27 07:38:22
【问题描述】:

阅读此代码AsyncSubscriber.java: 编码器使用 AtomicBoolean 创建 Happens Before 关系,我想知道:

1_ 是否等同于使用同步块? 看起来线条 if (on.get()) 不保证阻止

try {
            final Signal s = inboundSignals.poll(); // We take a signal off the queue


 if (!done) { // If we're done, we shouldn't process any more signals, obeying rule 2.8
          // Below we simply unpack the `Signal`s and invoke the corresponding methods
          if (s instanceof OnNext<?>)
            handleOnNext(((OnNext<T>)s).next);
          else if (s instanceof OnSubscribe)
            handleOnSubscribe(((OnSubscribe)s).subscription);
          else if (s instanceof OnError) // We are always able to handle OnError, obeying rule 2.10
            handleOnError(((OnError)s).error);
          else if (s == OnComplete.Instance) // We are always able to handle OnComplete, obeying rule 2.9
            handleOnComplete();
        }
      }

一次会被1个线程执行。

确实当on.get()返回true时,是什么阻止了另一个线程进入临界区?!

2_ 它比同步块更有效吗? (假设 AtomicBoolean 使用 Volatile 变量)

这里是代码部分:

    // We are using this `AtomicBoolean` to make sure that this `Subscriber` doesn't run concurrently with itself,
      // obeying rule 2.7 and 2.11
      private final AtomicBoolean on = new AtomicBoolean(false);

       @SuppressWarnings("unchecked")
       @Override public final void run() {
        if(on.get()) { // establishes a happens-before relationship with the end of the previous run
          try {
            final Signal s = inboundSignals.poll(); // We take a signal off the queue
            if (!done) { // If we're done, we shouldn't process any more signals, obeying rule 2.8
              // Below we simply unpack the `Signal`s and invoke the corresponding methods
              if (s instanceof OnNext<?>)
                handleOnNext(((OnNext<T>)s).next);
              else if (s instanceof OnSubscribe)
                handleOnSubscribe(((OnSubscribe)s).subscription);
              else if (s instanceof OnError) // We are always able to handle OnError, obeying rule 2.10
                handleOnError(((OnError)s).error);
              else if (s == OnComplete.Instance) // We are always able to handle OnComplete, obeying rule 2.9
                handleOnComplete();
            }
          } finally {
            on.set(false); // establishes a happens-before relationship with the beginning of the next run
            if(!inboundSignals.isEmpty()) // If we still have signals to process
              tryScheduleToExecute(); // Then we try to schedule ourselves to execute again
          }
        }
      }
// What `signal` does is that it sends signals to the `Subscription` asynchronously
  private void signal(final Signal signal) {
    if (inboundSignals.offer(signal)) // No need to null-check here as ConcurrentLinkedQueue does this for us
      tryScheduleToExecute(); // Then we try to schedule it for execution, if it isn't already
  }

  // This method makes sure that this `Subscriber` is only executing on one Thread at a time
  private final void tryScheduleToExecute() {
    if(on.compareAndSet(false, true)) {
      try {
        executor.execute(this);
      } catch(Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully and not violate rule 2.13
        if (!done) {
          try {
            done(); // First of all, this failure is not recoverable, so we need to cancel our subscription
          } finally {
            inboundSignals.clear(); // We're not going to need these anymore
            // This subscription is cancelled by now, but letting the Subscriber become schedulable again means
            // that we can drain the inboundSignals queue if anything arrives after clearing
            on.set(false);
          }
        }
      }
    }

3_ 安全吗?

4_ 它是否常用于此目的(创建发生在关系之前)?

【问题讨论】:

标签: java multithreading reactive


【解决方案1】:

是的,对 AtomicBolean 的写入/读取建立了先发生关系:

compareAndSet 和所有其他读取和更新操作,例如 getAndIncrement 兼具读写的记忆效果 可变变量。

由于您没有发布整个代码,而且我们不知道它是如何使用的,所以很难说它是否是线程安全的,但是:

ad 1. 它不等同于同步块-线程不等待

ad 2. 是的,它可能更有效,但 compareAndSwap 没有义务由 volatile 变量支持 - 这是实现的数据。

ad 3. 很难说,但run 是一个公共方法这一事实暴露了一些错误的可能性,例如,如果两个线程将直接调用run,而go 将具有true 的值.在我看来,直接在run 方法中进行 compareAndSwap 会更好,但我不知道所有要求,所以这只是一个建议。

ad 4. 是的,AtomicBoolean 是常用的。

【讨论】:

  • 我编辑了我的问题,在我看来,这种情况并不能确保提及的块一次由一个线程执行?
  • 它确保了这一点,但前提是run 方法没有被tryScheduleToExecute 以外的任何东西调用或调度。
  • 关于你的第二个答案,我打开了 AtomicBoolean 的实现,它使用 volatile boolean
  • @NassimMOUALEK 是的,看来至少在 java 7 中 AtomicBoolean 是由 volatile int value; 支持的,但这只是实现的细节。
猜你喜欢
  • 1970-01-01
  • 2021-02-06
  • 1970-01-01
  • 1970-01-01
  • 2017-07-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多