【问题标题】:Threading queue in c++C++中的线程队列
【发布时间】:2021-08-02 16:37:11
【问题描述】:

目前正在做一个项目,目前正在与线程和队列作斗争,问题是所有线程都在队列中占用相同的项目。

可重现的例子:

#include <iostream>
#include <queue>
#include <thread>

using namespace std;

void Test(queue<string> queue){
    
    while (!queue.empty()) {
    
        string proxy = queue.front();
        cout << proxy << "\n";
        
        queue.pop();
    
    }
    
}

int main()
{
    
    queue<string> queue;
    
    queue.push("101.132.186.39:9090");
    queue.push("95.85.24.83:8118");
    queue.push("185.211.193.162:8080");
    queue.push("87.106.37.89:8888");
    queue.push("159.203.61.169:8080");
    
    std::vector<std::thread> ThreadVector;
    
    
    for (int i = 0; i <= 10; i++){
        ThreadVector.emplace_back([&]() {Test(queue); });
    }
    
    for (auto& t : ThreadVector){
        t.join();
    }

    ThreadVector.clear();

    return 0;
}

【问题讨论】:

标签: c++ multithreading queue


【解决方案1】:

您为每个线程提供了自己的队列副本。我想你想要的是所有线程都在同一个队列上工作,因此当多个线程在 shared 队列上工作时,你需要使用一些同步机制,因为 std 队列不是线程安全的。

编辑:次要说明:在您的代码中,您生成 11 个线程而不是 10 个。

编辑2:好的,先试试这个:

std::mutex lock_work;
std::mutex lock_io;

void Test(queue<string>& queue){

while (!queue.empty()) {
    string proxy;
    {
        std::lock_guard<std::mutex> lock(lock_work);
        proxy = queue.front();
        queue.pop();
    }
    {
        std::lock_guard<std::mutex> lock(lock_io);
        cout << proxy << "\n";
    }
}   

}

【讨论】:

  • 谢谢,我看看我能做些什么。
【解决方案2】:

看看这个sn-p:

void Test(std::queue<std::string> queue) { /* ... */ }

在这里,您将队列对象的副本传递给线程。

此副本是每个线程的本地副本,因此在每个线程退出后它都会被销毁,因此最终您的程序不会对驻留在 main() 函数中的实际 queue 对象产生任何影响。

要解决此问题,您需要使参数采用引用或指针:

void Test(std::queue<std::string>& queue) { /* ... */ }

这使得参数直接引用存在于main() 中的queue 对象,而不是创建副本。

现在,上面的代码仍然不正确,因为queue 容易出现data-race,而std::queuestd::cout 都不是线程安全的,并且在当前被一个线程访问时可能会被另一个线程中断。为防止这种情况,请使用std::mutex

// ...
#include <mutex>

// ...

// The mutex protects the 'queue' object from being subjected to data-race amongst different threads
// Additionally 'io_mut' is used to protect the streaming operations done with 'std::cout'
std::mutex mut, io_mut;

void Test(std::queue<std::string>& queue) {
    std::queue<std::string> tmp;
    {
        // Swap the actual object with a local temporary object while being protected by the mutex
        std::lock_guard<std::mutex> lock(mut);
        std::swap(tmp, queue);
    }
    while (!tmp.empty()) {
        std::string proxy = tmp.front();
        {
            // Call to 'std::cout' needs to be synchronized
            std::lock_guard<std::mutex> lock(io_mut);
            std::cout << proxy << "\n";
        }
        tmp.pop();
    }
}

这会同步每个线程调用,并防止任何其他线程在 queue 仍在被线程访问时访问。

编辑:

另外,在我看来,让每个线程等待直到其中一个线程收到您推送到std::queue 的通知会快得多。您可以通过使用std::condition_variable 来做到这一点:

// ...

#include <mutex>
#include <condition_variable>

// ...

std::mutex mut1, mut2;
std::condition_variable cond;

void Test(std::queue<std::string>& queue, std::chrono::milliseconds timeout = std::chrono::milliseconds{10}) {
    std::unique_lock<std::mutex> lock(mut1);
    // Wait until 'queue' is not empty...
    cond.wait(lock, [queue] { return queue.empty(); });
    while (!queue.empty()) {
        std::string proxy = std::move(queue.front());
        std::cout << proxy << "\n";
        queue.pop();
    }
}

// ...

int main() {
    std::queue<string> queue;
    
    std::vector<std::thread> ThreadVector;
    
    for (int i = 0; i <= 10; i++)
        ThreadVector.emplace_back([&]() { Test(queue); });
    
    // Notify the vectors of each 'push()' call to 'queue'
    {
        std::unique_lock<std::mutex> lock(mut2);
        queue.push("101.132.186.39:9090");
        cond.notify_one();
    }
    
    {
        std::unique_lock<std::mutex> lock(mut2);
        queue.push("95.85.24.83:8118");
        cond.notify_one();
    }
    
    {
        std::unique_lock<std::mutex> lock(mut2);
        queue.push("185.211.193.162:8080");
        cond.notify_one();
    }
    
    {
        std::unique_lock<std::mutex> lock(mut2);
        queue.push("87.106.37.89:8888");
        cond.notify_one();
    }
    
    {
        std::unique_lock<std::mutex> lock(mut2);
        queue.push("159.203.61.169:8080");
        cond.notify_one();
    }

    for (auto& t : ThreadVector)
        t.join();

    ThreadVector.clear();
}

【讨论】:

  • 这实际上不起作用,就像它只在一个线程上运行一样。我的目标是多线程一个函数同时运行 X 次
  • @VRX 你可以尝试用一个空的临时对象交换引用的队列对象,然后在临时对象上做一些事情,然后你可以用实际对象交换回来。我已将其包含在答案中,但我怀疑这是否会使其更快。
  • 我得到“deque 在 pop 之前为空”,它仍然像在一个线程上一样工作。
  • 我正在使用 cpr 库请求可能是因为这个?
  • 为什么要创建队列的本地 tmp 副本?您正在正确序列化对队列的修改,但实际上存在数据竞争:一个线程可以丢弃另一个线程对队列所做的任何更改。
猜你喜欢
  • 1970-01-01
  • 2023-03-04
  • 1970-01-01
  • 2010-10-22
  • 2014-03-13
  • 2011-02-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多