【问题标题】:Synchronization output streams from different threads同步来自不同线程的输出流
【发布时间】:2012-10-18 12:14:18
【问题描述】:

我正在编写一个应用程序,它将来自不同线程的一些数据写入 xml 文件。我尝试使用事件核心对象同步它,但在文件中我得到错误的数据。我得到下一个结果

<file path="somePath" />
<file path="somePath" <file path="somePath" /> />....

但我希望得到

<file path="somePath" />
<file path="somePath" />
<file path="somePath" />

请看下面我的伪代码。有什么问题?

unsigned int WINAPI MyThread(void *p)
{
    std::wofstream outstr;
    outstr.open("indexingtest.xml", std::ios::app);
    do
    {
        if(somePredicat1)
        {
            WaitForSingleObject(hEvent, INFINITE);
            outstr <<"<file path=\""<< sFileName << "\"\n";
            outstr <<"\tsize=\""<< fileSize << "\" />\n";           
            ReleaseMutex(hMutex);
        }
         if(somePredicat3)
         {
             MyThread(sFileName);
         }
    }while(somePredicat2);
    outstr.close();
    FindClose( hSearch );
    return 0;
}

int _tmain(int argc, TCHAR *argv[])
{
    hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
    //hMutex = CreateMutex(NULL, FALSE, 0);
    unsigned int ThreadID;
    HANDLE hThread1 = (HANDLE)_beginthreadex(NULL, 0, MyThread, L"D:\\*", 0, &ThreadID);
    HANDLE hThread2 = (HANDLE)_beginthreadex(NULL, 0, MyThread, L"C:\\*", 0, &ThreadID);
    SetEvent(hEvent);
    std::wcout << "\a" << std::endl;
    WaitForSingleObject( hThread1, INFINITE );
    return 0;
}

更具体的代码

HANDLE hMutex = CreateMutex(NULL,FALSE, 0);
wchar_t** GetAllFilesImpl( wchar_t const* folder, wchar_t** res, size_t* pAllocated, size_t* pUsed )
{
    HANDLE hSearch;
    WIN32_FIND_DATAW fileinfo;
    size_t allocatedMemory = 0;


    hSearch = FindFirstFileW( folder, &fileinfo );
    if( hSearch != INVALID_HANDLE_VALUE ) {
        do {

            wchar_t* sFileName, ** tmp, sTmp[ 1024 ];
            long fileSize = 0;
            long creationDate;
            /* ignore ., .. */
            if( !wcscmp(fileinfo.cFileName, L".") ||
                !wcscmp(fileinfo.cFileName, L"..") )
                continue;
            sFileName = PathCreator( folder, fileinfo.cFileName );
            fileSize = fileinfo.nFileSizeLow;
            creationDate = fileinfo.ftCreationTime.dwHighDateTime;


            if(fileSize)
            {
                WaitForSingleObject(hMutex, INFINITE);
                std::wofstream outstr;
                            outstr.open("indexingtest.xml", std::ios::app);
                outstr.seekp(std::ios_base::end);
                outstr <<"<file path=\""<< sFileName << "\"\n";
                outstr <<"\tsize=\""<< fileSize << "\" />\n";
                outstr.seekp(std::ios_base::end);
                outstr.close();
                wprintf( L"%s\n", sFileName);
                ReleaseMutex(hMutex);
            }

            tmp = AddToArray( res, pAllocated, pUsed, sFileName );
            if( !tmp ) return FreeAllFilesMemory(res), NULL;
            res = tmp;

            if( fileinfo.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY ) {
                wcscpy_s( sTmp, sFileName );
                wcscat_s( sTmp, L"\\*" );
                tmp = GetAllFilesImpl( sTmp, res, pAllocated, pUsed );
                if( !tmp ) return NULL;
                res = tmp;
            }
        } while( FindNextFileW(hSearch, &fileinfo) );

        FindClose( hSearch );
    }
    return res;
}

unsigned int WINAPI GetAllFiles( void* folder )
{
    size_t nAllocated = 0, nUsed = 0;
    wchar_t** res = GetAllFilesImpl( (wchar_t *)folder, NULL, &nAllocated, &nUsed );
    if( res ) {
        /* to indicate end of result add a NULL string */
        wchar_t** tmp = AddToArray( res, &nAllocated, &nUsed, NULL );
        if( !tmp ) return FreeAllFilesMemory(res), -1;
        res = tmp;
    }
    std::wcout << "\a" << std::endl;
    return 0;
}
int _tmain(int argc, TCHAR *argv[])
{

    Sleep(1000);
    unsigned int ThreadID;
    HANDLE hThreads[3];
    hThreads[0] = (HANDLE)_beginthreadex(NULL, 0, GetAllFiles, L"D:\\*", 0, &ThreadID);
    hThreads[1] = (HANDLE)_beginthreadex(NULL, 0, GetAllFiles, L"C:\\Users\\Andrew\\Desktop\\*", 0, &ThreadID);
    hThreads[2] = (HANDLE)_beginthreadex(NULL, 0, GetAllFiles, L"E:\\*", 0, &ThreadID);
    unsigned int dw = WaitForMultipleObjects(3, hThreads, TRUE, INFINITE);
    CloseHandle(hFile);
    printf("finished\n");
    return 0;
}

【问题讨论】:

    标签: c++ multithreading winapi synchronization wofstream


    【解决方案1】:

    您遇到的 问题是每个线程都在单独打开文件。而是在创建线程之前打开文件,然后使用互斥锁同步对文件的写入。

    在伪代码中:

    std::wofstream output_file;
    
    void my_thread()
    {
        do
        {
            if (some_condition)
            {
                lock_mutex();
                do_output();
                unlock_mutex();
            }
        } while (condition);
    }
    
    int main()
    {
        output_file.open(...);
    
        create_thread();
        create_thread();
    
        output_file.close();
    }
    

    【讨论】:

    • 这样,并非所有数据都已写入文件,并且混合出现。我忘了说线程函数中有递归。看问题的版本。
    【解决方案2】:
    1. 你应该在结束代码之前等待所有线程

      HANDLE aThread[2];
      
      ...
      
      aThread[0] =  (HANDLE)_beginthreadex(...
      aThread[1] =  (HANDLE)_beginthreadex(...
      
      WaitForMultipleObjects(THREADCOUNT, aThread, TRUE, INFINITE);
      
    2. 在输出之前您正在等待事件。完成输出后 你释放互斥锁。这根本不符合逻辑。你应该等待互斥锁和 事件。设置事件后,两个线程都将等待释放。就这样 互斥锁不做任何事情。 当您将事件句柄和互斥量句柄放在一个数组中时,您可以使用 WaitForMultipleObjects 也用于此目的:

      HANDLE hVarious[2];
      hVarious[0] = CreateEvent(NULL, TRUE, FALSE, NULL);
      // Note: this is a manual reset event. 
      // Thus is stays set until explicitly reset
      
      hVarious[1] = CreateMutex(NULL, FALSE, 0);
      
      // and now start the threads:
      aThread[0] =  (HANDLE)_beginthreadex(...
      aThread[1] =  (HANDLE)_beginthreadex(...
      
      // and set the event:
      SetEvent(hEvent);
      
      
      WaitForMultipleObjects(2, aThread, TRUE, INFINITE);
      

      然后线程应该如下所示:

      unsigned int WINAPI MyThread(void *p)
      {
        do
        {
          if(somePredicat1)
          {
            // wait for the mutex AND the event
            WaitForMultipleObjects(2, hVarious, TRUE, INFINITE);
      
            // do the file stuff in the mutex protected part       
            std::wofstream outstr;
            outstr.open("indexingtest.xml", std::ios::app);
      
            outstr <<"<file path=\""<< sFileName << "\"\n";
            outstr <<"\tsize=\""<< fileSize << "\" />\n";           
      
            outstr.close();
            FindClose( hSearch );
            ReleaseMutex(hVarious[1]);
          }
        }while(somePredicat2);
        return 0;
      }
      

    记住:互斥锁的建立是为了保护并发应用程序中的资源。

    我不知道somePredicat1somePredicat1。这些参数在不同线程中使用时也可能会出现问题。但是,您观察到的错误输出是由错误的互斥锁使用引起的。

    评论后编辑:

        if(somePredicat3)
        {
             MyThread(sFileName);
        }
    

    一个。线程本身作为函数调用而不关闭文件。

    b.您应该提供更多关于 somePredicat3, somePredicat2, and somePredicat1 用途的详细信息。

    c。您必须使用某种排他性来保护输出文件,因为它已被使用 通过不止一个线程。您也可以使用Critical Section Object 这样做。

    【讨论】:

    • 我的代码中没有使用互斥锁,这是一个错字。它必须被评论。我想使用事件核心对象进行同步。我正在尝试 WaitForMultipleObjects() 但它没有帮助。我不知道怎么了。我的代码应该输出数以千计的字符串,并且几乎所有的都是正确的,但是对于他中的大约十个是混合的(包含 2 行)。有什么问题...
    • 一个错字...嗯。因此,您已经尝试过使用互斥锁来解决问题。这是正确的方法,你为什么放弃它。请查看编辑后的答案。
    • 我认为使用 CRITICAL_SETION 我会得到相同的结果,而且我担心会出现线程阻塞。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-03-09
    • 2012-07-01
    • 2016-12-19
    相关资源
    最近更新 更多