【问题标题】:How to make Win32/MFC threads loop in lockstep?如何使 Win32/MFC 线程同步循环?
【发布时间】:2011-06-24 04:18:55
【问题描述】:

我是 Windows 中的多线程新手,所以这可能是一个微不足道的问题:确保线程同步执行循环的最简单方法是什么?

我尝试将Events 的共享数组传递给所有线程,并在循环结束时使用WaitForMultipleObjects 来同步它们,但这让我在一个(有时是两个)循环后陷入僵局。这是我当前代码的简化版本(只有两个线程,但我想让它可扩展):

typedef struct
{
    int rank;
    HANDLE* step_events;
} IterationParams;

int main(int argc, char **argv)
{
    // ...

    IterationParams p[2];
    HANDLE step_events[2];
    for (int j=0; j<2; ++j)
    {
        step_events[j] = CreateEvent(NULL, FALSE, FALSE, NULL);
    }

    for (int j=0; j<2; ++j)
    {
        p[j].rank = j;
        p[j].step_events = step_events;
        AfxBeginThread(Iteration, p+j);
    }

    // ...
}

UINT Iteration(LPVOID pParam)
{
    IterationParams* p = (IterationParams*)pParam;
    int rank = p->rank;

    for (int i=0; i<100; i++)
    {
        if (rank == 0)
        {
            printf("%dth iteration\n",i);
            // do something
            SetEvent(p->step_events[0]);
            WaitForMultipleObjects(2, p->step_events, TRUE, INFINITE);
        }
        else if (rank == 1)
        {
            // do something else
            SetEvent(p->step_events[1]);
            WaitForMultipleObjects(2, p->step_events, TRUE, INFINITE);
        }
    }
    return 0;
}

(我知道我正在混合使用 C 和 C++,它实际上是我正在尝试并行化的遗留 C 代码。)

阅读 MSDN 上的文档,我认为这应该可行。但是,线程 0 只打印一次,偶尔打印两次,然后程序挂起。这是同步线程的正确方法吗?如果没有,您会推荐什么(MFC 中真的没有对屏障的内置支持吗?)。


编辑:这个解决方案是错误,甚至包括Alessandro's fix。例如,考虑这种情况:

  1. 线程 0 设置其事件并调用等待,阻塞
  2. 线程 1 设置其事件并调用等待,阻塞
  3. 线程 0 从等待返回,重置其事件,并在线程 1 不受控制的情况下完成一个循环
  4. 线程 0 设置自己的事件并调用等待。由于线程 1 还没有机会重置其事件,线程 0 的等待立即返回并且线程不同步。

所以问题仍然存在:如何安全地确保线程保持同步?

【问题讨论】:

    标签: c++ multithreading mfc


    【解决方案1】:

    要让它工作,请将CreateEvent 的第二个参数设置为TRUE。这将使事件“手动重置”并防止Waitxxx 重置它。 然后在循环的开头放置一个ResetEvent

    【讨论】:

    • 我不想要 Wait... 重置事件吗?否则线程在第一个周期后不会同步,不是吗?
    • 否,因为第一个返回的等待会重置事件,阻止另一个等待返回。
    • 正确。顺便说一句,Wait 可以在循环结束时组合,而不是在 if 块中:-)
    【解决方案2】:

    我通过谷歌搜索“屏障同步窗口”找到了这个SyncTools(下载 SyncTools.zip)。它使用一个 CriticalSection 和一个 Event 来实现 N 个线程的屏障。

    【讨论】:

      【解决方案3】:

      简介

      我实现了一个简单的 C++ 程序供您参考(在 Visual Studio 2010 中测试)。它仅使用 Win32 API(以及用于控制台输出的标准库和一些随机化)。你应该可以把它放到一个新的 Win32 控制台项目中(没有预编译的头文件),编译并运行。


      解决方案

      #include <tchar.h>
      #include <windows.h>
      
      
      //---------------------------------------------------------
      // Defines synchronization info structure. All threads will
      // use the same instance of this struct to implement randezvous/
      // barrier synchronization pattern.
      struct SyncInfo
      {
          SyncInfo(int threadsCount) : Awaiting(threadsCount), ThreadsCount(threadsCount), Semaphore(::CreateSemaphore(0, 0, 1024, 0)) {};
          ~SyncInfo() { ::CloseHandle(this->Semaphore); }
          volatile unsigned int Awaiting; // how many threads still have to complete their iteration
          const int ThreadsCount;
          const HANDLE Semaphore;
      };
      
      
      //---------------------------------------------------------
      // Thread-specific parameters. Note that Sync is a reference
      // (i.e. all threads share the same SyncInfo instance).
      struct ThreadParams
      {
          ThreadParams(SyncInfo &sync, int ordinal, int delay) : Sync(sync), Ordinal(ordinal), Delay(delay) {};
          SyncInfo &Sync;
          const int Ordinal;
          const int Delay;
      };
      
      
      //---------------------------------------------------------
      // Called at the end of each itaration, it will "randezvous"
      // (meet) all the threads before returning (so that next
      // iteration can begin). In practical terms this function
      // will block until all the other threads finish their iteration.
      static void RandezvousOthers(SyncInfo &sync, int ordinal)
      {
          if (0 == ::InterlockedDecrement(&(sync.Awaiting))) { // are we the last ones to arrive?
              // at this point, all the other threads are blocking on the semaphore
              // so we can manipulate shared structures without having to worry
              // about conflicts
              sync.Awaiting = sync.ThreadsCount;
              wprintf(L"Thread %d is the last to arrive, releasing synchronization barrier\n", ordinal);
              wprintf(L"---~~~---\n");
      
              // let's release the other threads from their slumber
              // by using the semaphore
              ::ReleaseSemaphore(sync.Semaphore, sync.ThreadsCount - 1, 0); // "ThreadsCount - 1" because this last thread will not block on semaphore
          }
          else { // nope, there are other threads still working on the iteration so let's wait
              wprintf(L"Thread %d is waiting on synchronization barrier\n", ordinal);
              ::WaitForSingleObject(sync.Semaphore, INFINITE); // note that return value should be validated at this point ;)
          }
      }
      
      
      //---------------------------------------------------------
      // Define worker thread lifetime. It starts with retrieving
      // thread-specific parameters, then loops through 5 iterations
      // (randezvous-ing with other threads at the end of each),
      // and then finishes (the thread can then be joined).
      static DWORD WINAPI ThreadProc(void *p)
      {
          ThreadParams *params = static_cast<ThreadParams *>(p);
          wprintf(L"Starting thread %d\n", params->Ordinal);
      
          for (int i = 1; i <= 5; ++i) {
              wprintf(L"Thread %d is executing iteration #%d (%d delay)\n", params->Ordinal, i, params->Delay);
              ::Sleep(params->Delay); 
              wprintf(L"Thread %d is synchronizing end of iteration #%d\n", params->Ordinal, i);
              RandezvousOthers(params->Sync, params->Ordinal);
          }
      
          wprintf(L"Finishing thread %d\n", params->Ordinal);
          return 0;
      }
      
      
      //---------------------------------------------------------
      // Program to illustrate iteration-lockstep C++ solution.
      int _tmain(int argc, _TCHAR* argv[])
      {
          // prepare to run
          ::srand(::GetTickCount()); // pseudo-randomize random values :-)
          SyncInfo sync(4);
          ThreadParams p[] = {
              ThreadParams(sync, 1, ::rand() * 900 / RAND_MAX + 100), // a delay between 200 and 1000 milliseconds will simulate work that an iteration would do
              ThreadParams(sync, 2, ::rand() * 900 / RAND_MAX + 100),
              ThreadParams(sync, 3, ::rand() * 900 / RAND_MAX + 100),
              ThreadParams(sync, 4, ::rand() * 900 / RAND_MAX + 100),
          };
      
          // let the threads rip
          HANDLE t[] = {
              ::CreateThread(0, 0, ThreadProc, p + 0, 0, 0),
              ::CreateThread(0, 0, ThreadProc, p + 1, 0, 0),
              ::CreateThread(0, 0, ThreadProc, p + 2, 0, 0),
              ::CreateThread(0, 0, ThreadProc, p + 3, 0, 0),
          };
      
          // wait for the threads to finish (join)
          ::WaitForMultipleObjects(4, t, true, INFINITE);
      
          return 0;
      }
      

      样本输出

      在我的机器(双核)上运行这个程序会产生以下输出:

      Starting thread 1
      Starting thread 2
      Starting thread 4
      Thread 1 is executing iteration #1 (712 delay)
      Starting thread 3
      Thread 2 is executing iteration #1 (798 delay)
      Thread 4 is executing iteration #1 (477 delay)
      Thread 3 is executing iteration #1 (104 delay)
      Thread 3 is synchronizing end of iteration #1
      Thread 3 is waiting on synchronization barrier
      Thread 4 is synchronizing end of iteration #1
      Thread 4 is waiting on synchronization barrier
      Thread 1 is synchronizing end of iteration #1
      Thread 1 is waiting on synchronization barrier
      Thread 2 is synchronizing end of iteration #1
      Thread 2 is the last to arrive, releasing synchronization barrier
      ---~~~---
      Thread 2 is executing iteration #2 (798 delay)
      Thread 3 is executing iteration #2 (104 delay)
      Thread 1 is executing iteration #2 (712 delay)
      Thread 4 is executing iteration #2 (477 delay)
      Thread 3 is synchronizing end of iteration #2
      Thread 3 is waiting on synchronization barrier
      Thread 4 is synchronizing end of iteration #2
      Thread 4 is waiting on synchronization barrier
      Thread 1 is synchronizing end of iteration #2
      Thread 1 is waiting on synchronization barrier
      Thread 2 is synchronizing end of iteration #2
      Thread 2 is the last to arrive, releasing synchronization barrier
      ---~~~---
      Thread 4 is executing iteration #3 (477 delay)
      Thread 3 is executing iteration #3 (104 delay)
      Thread 1 is executing iteration #3 (712 delay)
      Thread 2 is executing iteration #3 (798 delay)
      Thread 3 is synchronizing end of iteration #3
      Thread 3 is waiting on synchronization barrier
      Thread 4 is synchronizing end of iteration #3
      Thread 4 is waiting on synchronization barrier
      Thread 1 is synchronizing end of iteration #3
      Thread 1 is waiting on synchronization barrier
      Thread 2 is synchronizing end of iteration #3
      Thread 2 is the last to arrive, releasing synchronization barrier
      ---~~~---
      Thread 2 is executing iteration #4 (798 delay)
      Thread 3 is executing iteration #4 (104 delay)
      Thread 1 is executing iteration #4 (712 delay)
      Thread 4 is executing iteration #4 (477 delay)
      Thread 3 is synchronizing end of iteration #4
      Thread 3 is waiting on synchronization barrier
      Thread 4 is synchronizing end of iteration #4
      Thread 4 is waiting on synchronization barrier
      Thread 1 is synchronizing end of iteration #4
      Thread 1 is waiting on synchronization barrier
      Thread 2 is synchronizing end of iteration #4
      Thread 2 is the last to arrive, releasing synchronization barrier
      ---~~~---
      Thread 3 is executing iteration #5 (104 delay)
      Thread 4 is executing iteration #5 (477 delay)
      Thread 1 is executing iteration #5 (712 delay)
      Thread 2 is executing iteration #5 (798 delay)
      Thread 3 is synchronizing end of iteration #5
      Thread 3 is waiting on synchronization barrier
      Thread 4 is synchronizing end of iteration #5
      Thread 4 is waiting on synchronization barrier
      Thread 1 is synchronizing end of iteration #5
      Thread 1 is waiting on synchronization barrier
      Thread 2 is synchronizing end of iteration #5
      Thread 2 is the last to arrive, releasing synchronization barrier
      ---~~~---
      Finishing thread 4
      Finishing thread 3
      Finishing thread 2
      Finishing thread 1
      

      请注意,为简单起见,每个线程都有随机的迭代持续时间,但该线程的所有迭代都将使用相同的随机持续时间(即迭代之间不会改变)。


      它是如何工作的?

      解决方案的“核心”在“RandezvousOthers”函数中。此函数将阻塞共享信号量(如果调用此函数的线程不是最后一个调用该函数的线程)或重置同步结构并解除阻塞共享信号量上的所有线程(如果调用此函数的线程函数被调用是最后一个调用函数)。

      【讨论】:

      • 代表后代,感谢您发布完整的示例。我实现了相同的逻辑(共享计数器和信号量),它似乎工作得很好。
      猜你喜欢
      • 1970-01-01
      • 2015-08-08
      • 2010-10-14
      • 2011-05-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-08
      • 1970-01-01
      相关资源
      最近更新 更多