【问题标题】:Worker thread suspend / resume implementation工作线程挂起/恢复实现
【发布时间】:2017-09-04 08:21:17
【问题描述】:

在尝试向我的 Worker [线程] 类添加挂起/恢复功能时,我遇到了一个我无法解释的问题。 (C++1y / VS2015)

这个问题看起来像一个死锁,但是一旦附加了调试器并且在某个点之前设置了断点(参见 #1),我似乎无法重现它 - 所以它看起来像是一个时间问题。

我可以找到的修复 (#2) 对我来说没有多大意义,因为它需要更长时间地持有互斥锁,并且客户端代码可能会尝试获取其他互斥锁,我理解这实际上会增加死锁的可能性。

但它确实解决了问题。

工人循环:

Job* job;
while (true)
{
  {
    std::unique_lock<std::mutex>  lock(m_jobsMutex);
    m_workSemaphore.Wait(lock);

    if (m_jobs.empty() && m_finishing)
    {
      break;
    }

    // Take the next job
    ASSERT(!m_jobs.empty());
    job = m_jobs.front();
    m_jobs.pop_front();
  }

  bool done = false;
  bool wasSuspended = false;
  do
  {
    // #2
    { // Removing this extra scoping seemingly fixes the issue BUT
      // incurs us holding on to m_suspendMutex while the job is Process()ing,
      // which might 1, be lengthy, 2, acquire other locks.
      std::unique_lock<std::mutex> lock(m_suspendMutex);
      if (m_isSuspended && !wasSuspended)
      {
        job->Suspend();
      }
      wasSuspended = m_isSuspended;

      m_suspendCv.wait(lock, [this] {
        return !m_isSuspended;
      });

      if (wasSuspended && !m_isSuspended)
      {
        job->Resume();
      }
      wasSuspended = m_isSuspended;
    }

    done = job->Process();
  }
  while (!done);
}

暂停/恢复只是:

void Worker::Suspend()
{
  std::unique_lock<std::mutex>  lock(m_suspendMutex);
  ASSERT(!m_isSuspended);
  m_isSuspended = true;
}

void Worker::Resume()
{
  {
    std::unique_lock<std::mutex>  lock(m_suspendMutex);
    ASSERT(m_isSuspended);
    m_isSuspended = false;
  }
  m_suspendCv.notify_one(); // notify_all() doesn't work either.
}

(Visual Studio)测试:

  struct Job: Worker::Job
  {
    int durationMs = 25;
    int chunks = 40;
    int executed = 0;

    bool Process()
    {
      auto now = std::chrono::system_clock::now();
      auto until = now + std::chrono::milliseconds(durationMs);
      while (std::chrono::system_clock::now() < until)
      { /* busy, busy */
      }

      ++executed;
      return executed < chunks;
    }

    void Suspend() { /* nothing here */ }
    void Resume() { /* nothing here */ }
  };

  auto worker = std::make_unique<Worker>();

  Job j;
  worker->Enqueue(j);

  std::this_thread::sleep_for(std::chrono::milliseconds(j.durationMs));  // Wait at least one chunk.

  worker->Suspend();

  Assert::IsTrue(j.executed < j.chunks);  // We've suspended before we finished.
  const int testExec = j.executed;

  std::this_thread::sleep_for(std::chrono::milliseconds(j.durationMs * 4));

  Assert::IsTrue(j.executed == testExec); // We haven't moved on.

  // #1
  worker->Resume(); // Breaking before this call means that I won't see the issue.
  worker->Finalize();

  Assert::IsTrue(j.executed == j.chunks); // Now we've finished.

我错过了什么/做错了什么?为什么作业的 Process()ing 必须由 suspend 互斥锁来保护?

编辑Resume() 在通知时不应该持有互斥锁;已解决 - 问题仍然存在。

【问题讨论】:

    标签: multithreading concurrency c++14 worker-thread


    【解决方案1】:

    当然,作业的Process()ing 不必由suspend 互斥锁来保护。

    j.executed 的访问 - 用于断言以及递增 - 但是确实需要同步(通过将其设为 std::atomic&lt;int&gt; 或使用互斥锁等方式保护它)。

    仍然不清楚为什么问题会以这种方式表现出来(因为我没有写入主线程上的变量)——可能是undefined behaviour propagating backwards in time 的情况。

    【讨论】:

      猜你喜欢
      • 2013-07-20
      • 2012-05-20
      • 2011-07-09
      • 2014-06-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-22
      相关资源
      最近更新 更多