【问题标题】:STL Queue with two threads(Producer, Consumer)具有两个线程(生产者、消费者)的 STL 队列
【发布时间】:2013-01-28 13:19:00
【问题描述】:

我想要队列安全的关键部分,以便线程不会同时访问队列。即使我评论与关键部分相关的行,此代码也有效。 谁能解释一下为什么?

queue<int> que;
CRITICAL_SECTION csection;
int i=0;

DWORD WINAPI ProducerThread(void*)
{

    while(1)
    {
        //if(TryEnterCriticalSection(&csection))
        {
            cout<<"Pushing value "<<i<<endl;
            que.push(i++);
            //LeaveCriticalSection(&csection);
        }
    }
}

//Consumer tHread that pops out the elements from front of queue
DWORD WINAPI ConsumerThread(void*)
{
    while(1)
    {
        //if(TryEnterCriticalSection(&csection))
        {
            if(!que.empty())
            {
                cout<<"Value in queue is "<<que.front()<<endl;
                que.pop();
            }
            else
                Sleep(2000);
            //LeaveCriticalSection(&csection);
        }
    }
}

int _tmain(int argc, _TCHAR* argv[])
{
    HANDLE handle[2];
    //InitializeCriticalSection(&csection);
    handle[0]=NULL;
    handle[1]=NULL;
    handle[0]=CreateThread(0,0,(LPTHREAD_START_ROUTINE)ProducerThread,0,0,0);
    if(handle[0]==NULL)
        ExitProcess(1);

    handle[1]=CreateThread(0,0,(LPTHREAD_START_ROUTINE)ConsumerThread,0,0,0);
    if(handle[1]==NULL)
        ExitProcess(1);

    WaitForMultipleObjects(2,handle,true,INFINITE);

    return 0;
}

【问题讨论】:

  • 定义“作品”。 AFAICS 进程在启动第一个线程后立即退出。
  • @R.MartinhoFernandes 尽管没有缩进,ExitProcess 调用是有条件的,只在错误情况下运行
  • @simonc 哦,你是对的。孩子们,这就是为什么你应该正确缩进你的代码。
  • 您有多核处理器(或多个处理器)吗?您运行了多长时间,您是否尝试过不使用“cout”[只需检查您现在获得的值+1 是否等于您下一次获得的值]。

标签: c++ multithreading winapi thread-safety


【解决方案1】:

偶然起作用,大概有两个原因:

  1. 它不起作用,但您从未注意到。消费者拉出队列中的任何东西,或者它认为队列中的任何东西。如果什么都没有,它会一直休眠,直到生产者推送了一些东西。这“有效”是因为生产者只追加到末尾,而消费者只从头开始读取。除了更新size。您很可能最终会有一个队列处于存在元素但size 没有反映它的状态。这是令人讨厌的,但相反的情况(迟早也会发生)更令人讨厌。
    你没有办法知道。好吧,您最终可能会知道,排队的工作项是否由于某种原因“消失”或内存不足,但请尝试找出原因。
  2. 你使用printf(或std::cout,同理),内部被临界区锁定。这种“类型”以您需要的方式锁定对队列的访问,除非它不需要。它将在 99.9% 的时间内工作(偶然,因为消费者将被阻止尝试打印,这比追加到队列的生产者需要更长的时间来唤醒)。但是,当在打印之后发生上下文切换时,它会突然失败。砰,你死定了。

您确实绝对需要使用关键部分对象或互斥锁来保护关键代码部分。否则,结果是不可预测的。与人们可能相信的相反,“但它有效”不是一件好事,它是可能发生的最糟糕的事情。因为它只在它不起作用之前起作用,然后你不知道为什么。

也就是说,您可以使用 IO 完成端口,它可以非常高效地为您完成所有工作。您可以使用GetQueuedCompletionStatus 从端口拉出一个“事件”并使用PostQueuedCompletionStatus 发布一个。完成端口完成队列的整个处理,包括为您与多个消费者进行适当的同步(并且它以 LIFO 顺序执行,这有利于避免上下文切换和缓存失效)。
每个事件都包含一个指向 OVERLAPPED 结构的指针,但完成端口不使用它,您可以只传递 any 指针(或者,如果您觉得这样更好,则传递一个指向OVERLAPPED 后跟您自己的数据)。

【讨论】:

  • “你使用 printf(或 std::cout,相同),它在内部被临界区锁定”。请解释这一行。您的意思是调用“cout”的成本不仅仅是推送/弹出一个元素,或者这意味着两个线程不能使用控制台屏幕同时打印消息?谢谢,我快到了。
  • 调用任何这样的函数,例如coutputs 来自同一控制台/stdout/任何明显的线程上的一个或多个线程会导致乱码。作为一个“单元”进入的东西作为一个单元出来。虽然这不是标准严格要求的(不是在 Windows 下,无论如何),但您永远不会看到其他任何事情发生。这必然意味着无论是在 CRT 内还是在较低级别,都会进行某种适当的同步(否则这将无法正常工作!)。因此,通过打印,您在不知情的情况下获取并释放了一个临界区。
  • 但同样,这可能会意外地隐藏您缺乏适当的同步,但这不足以保证推送/弹出队列按预期工作。它实际上只能保证打印正常工作。为确保您的代码正常工作(可靠,而不是偶然!),您绝对需要代码中的关键部分(或使用完成端口,它在内部完成)。
【解决方案2】:

在您的特定情况下, cout 将比“get”花费数百倍的时间。并且当队列为空时您会睡觉,这允许其他线程在您的“消费者”线程获取任何队列之前填充大量队列。

全速运行(无调试打印,无睡眠),确保运行很长时间,并用简单的数学检查另一端的值。

类似这样的:

int old_val = val;
while(1)
{
    if(!que.empty())
    {
       int  val = que.front();

       que.pop();
       if (old_val+1 != val)
       {
          /// Do something as things have gone wrong!
       }
     }
}

请注意,这也可能不会立即/微不足道地出错。你想运行它几个小时,最好在机器上运行其他东西——比如一个批处理文件:

@echo off
:again 
dir c:\ /s > NUL:
goto again

[自从我为 Windows 编写批处理脚本已经有一段时间了,所以这可能不是 100% 正确,但我认为你应该能够在谷歌上搜索我出错的任何答案——这个想法是“中断“机器]。

另外,请尝试运行您的一对线程的多个副本,每对有一个单独的队列 - 这将强制执行更多的调度活动,并可能引发问题。

就像 Anton 所说,其中一些东西通常很难重现。我在实时操作系统中遇到了一个问题,队列被弄乱了——唯一真正的迹象是内存最终在“压力测试”期间耗尽(它会做“随机”的事情,包括几个不同的中断源)。该操作系统已经在数百个单元的生产测试中进行了测试,并且作为真正的生产系统在现场进行了测试[并且在不同处理器上运行的相同代码再次在世界各地操作电话交换机,没有客户抱怨内存泄漏] ,貌似没有内存泄漏!但是队列处理中的一个“漏洞”,在一个函数中,只是运行。在认为是压力测试本身偶尔会遇到一些奇怪的队列建立的情况后,我最终发现了实际的问题 - 队列的读写之间的中断命中 - 恰好有两个指令的漏洞,并且只有当一个中断例程在发送消息时被另一个中断例程中断...我宁愿不再调试那个!

【讨论】:

  • Mats Peterson,如果我注释掉“cout”行(因为 cout 操作的成本远高于将元素推入/弹出队列的成本)。代码仍在运行(尽管我在控制台上看不到任何东西)。它不会崩溃。我仍然担心这种奇怪的行为。我想让你解释一下关于这段代码的更多信息。
  • 因此您需要将您从队列中取出的值与“预期”值进行比较。
  • 是的,我确实愿意,我想弹出每个元素。作为证据,我将它们打印在屏幕上。另一件事是,被推入队列的整数是完全按升序排列的(i,i+1,i+2,.....)
  • 是的,但是就像我说的,打印需要 100 倍的时间[并且很可能包含同步以使另一个线程与这个线程同步运行]。这些东西不容易测试 - 但我上面的代码应该会有所帮助。对于这类事情,20k 并不是一个很高的数字。我会运行它,直到它多次包装整数 - 这可能需要几个小时,但是当你的大程序中缺少一个依赖于在另一端出现的每一个排队的东西时,尝试找出问题所在肯定比试图找出问题要好.
【解决方案3】:

CRITICAL_SECTION 防止的那种错误的最大问题之一是 很难重现它们。您必须预测它会如何失败而无法展示它。

当您保护自己的代码而不是包装非线程安全的库调用时,通常可以通过在某个地方添加Sleep 来触发竞争条件。在您发布的代码中,生产者没有机会这样做(无论不变量被破坏,它都在que.push 内部完成),并且消费者检查空队列的潜在TOCTTOU 问题在存在时不存在只有一个消费者。如果我们可以将Sleep 添加到队列实现中,那么我们就能以可预测的方式使事情出错。

【讨论】:

  • Mats Peterson,我有双核 PC,Simoc 是 ExitProcess 调用是有条件的。这段代码有效(我已经检查过它和 i 的值,全局变量达到 20,000)我不知道两个线程如何在没有任何锁定(关键部分,信号量)机制的情况下访问同一个队列
【解决方案4】:

如果只有一个生产者和一个消费者,队列代码可能对这种弹出轮询是安全的。如果生产者推送使用临时索引/指针将数据插入到下一个空队列位置,并且仅将递增的“临时索引”存储到队列“下一个空”成员中,则 queue.empty 可以返回 true,直到它是安全的消费者要弹出的数据。这种操作可能是偶然设计的,也可能是偶然发生的。

一旦你有多个生产者或多个消费者,它肯定会爆炸,迟早。

编辑 - 即使队列证明对于一个生产者和一个消费者来说是安全的,除非它被记录在案,否则你不应该依赖它 - 一些 b***d 将更改实现下一个版本:(

【讨论】:

    猜你喜欢
    • 2016-08-30
    • 2015-09-25
    • 1970-01-01
    • 2013-09-30
    • 1970-01-01
    • 1970-01-01
    • 2023-03-06
    • 2012-01-28
    • 1970-01-01
    相关资源
    最近更新 更多