【问题标题】:Synchronising thread startup同步线程启动
【发布时间】:2018-12-30 01:34:30
【问题描述】:

我正在运行一些代码(经过简化,但仍会破坏下面的版本),在等待第一次切换的大约 1/3000 次执行中失败。应该发生的是:

  • threads[0] 启动并抓取互斥锁
  • threads[0] 通知cond_main 以便主线程可以创建thread[1]
  • thread[1]/thread[0]做一些工作等待对方的信号

不幸的是,它在 thread[0] 中失败 - cond.wait 以超时结束并引发异常。我将如何同步它,确保cond_main 不会过早收到信号?

理想情况下,我想从主线程传递一个锁定的互斥锁并在第一个生成的线程中解锁它,但 Ruby 要求在同一个线程中解锁互斥锁 - 所以这不起作用。

自包含的复制器(本身没有多大意义,但实际工作被剥离了):

def run_test
  mutex     = Mutex.new
  cond      = ConditionVariable.new
  cond_main = ConditionVariable.new
  threads   = []

  t1_done = false
  t2_done = false

  threads << Thread.new do
    mutex.synchronize do
      # this needs to happen first
      cond_main.signal
      cond.wait(mutex, 2)
      raise 'timeout waiting for switch' if !t2_done

      # some work
      t1_done = true
      cond.signal
    end
  end
  cond_main.wait(Mutex.new.lock, 2)

  threads << Thread.new do
    mutex.synchronize do
      cond.signal
      # some work
      t2_done = true
      cond.wait(mutex, 2)
      raise 'timeout waiting for switch' if !t1_done
    end
  end

  threads.map(&:join)
end

5000.times { |x|
  puts "Run #{x}"
  run_test
}

在 Ruby 2.5.3 上测试

【问题讨论】:

    标签: ruby multithreading


    【解决方案1】:

    设置一个 while 块以在第二个线程完成时停止等待(查看更多 here):

    def run_test
      mutex     = Mutex.new
      cond      = ConditionVariable.new
      cond_main = ConditionVariable.new
      threads   = []
    
      spawned = false
    
      t1_done = false
      t2_done = false
    
      threads << Thread.new do
        mutex.synchronize do
          while(!spawned) do
            cond.wait(mutex, 2)
          end
          raise 'timeout waiting for switch' if !t2_done
    
          # some work
          t1_done = true
          cond.signal
        end
      end
    
      threads << Thread.new do
        mutex.synchronize do
          spawned = true
          cond.signal
          # some work
          t2_done = true
          cond.wait(mutex, 2)
          raise 'timeout waiting for switch' if !t1_done
        end
      end
    
      threads.map(&:join)
    end
    
    50000.times { |x| 
      puts x 
      run_test 
    }
    

    或者,使用counting semaphore,我们可以为线程分配一些优先级:

    require 'concurrent-ruby'
    
    def run_test
      mutex     = Mutex.new
      sync      = Concurrent::Semaphore.new(0)
      cond      = ConditionVariable.new
      cond_main = ConditionVariable.new
      threads   = []
    
      t1_done = false
      t2_done = false
    
      threads << Thread.new do
        mutex.synchronize do
          sync.release(1)
          # this needs to happen first
          cond.wait(mutex, 2)
          raise 'timeout waiting for switch' if !t2_done
    
          # some work
          t1_done = true
          cond.signal
        end
      end
    
      threads << Thread.new do
        sync.acquire(1)
        mutex.synchronize do
          cond.signal
          # some work
          t2_done = true
          cond.wait(mutex, 2)
          raise 'timeout waiting for switch' if !t1_done
        end
      end
    
      threads.map(&:join)
    end
    
    50000.times { |x| 
      puts x 
      run_test 
    }
    

    我更喜欢第二种解决方案,因为它允许您控制线程的顺序,虽然感觉有点脏。

    出于好奇,在 Ruby 2.6 上,您的代码似乎没有引发异常(测试超过 10M 次运行)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-01-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-04-30
      • 1970-01-01
      • 2020-08-22
      相关资源
      最近更新 更多