【发布时间】:2017-10-20 23:53:44
【问题描述】:
我提出了以下阻塞队列实现,其中 std::vector 作为存储在队列中的元素的容器,并使用 Boost 进行线程/同步。我还提到了类似的帖子here。
template<typename T>
class BlockingQueue
{
public:
explicit BlockingQueue(const std::vector<T>& buf):
buffer(buf)
{}
explicit BlockingQueue(): buffer()
{}
void push(const T& elem);
T pop();
~BlockingQueue()
{}
private:
boost::mutex mutex; // mutex variable
boost::condition_variable_any notEmptyCond; // condition variable, to check whether the queue is empty
std::vector<T> buffer;
};
template<typename T>
void BlockingQueue<T>::push(const T& elem)
{
boost::mutex::scoped_lock lock(mutex);
buffer.push_back(elem);
notEmptyCond.notify_one(); // notifies one of the waiting threads which are blocked on the queue
// assert(!buffer.empty());
}
template<typename T>
T BlockingQueue<T>::pop()
{
boost::mutex::scoped_lock lock(mutex);
notEmptyCond.wait(lock,[&](){ return (buffer.size() > 0); }); // waits for the queue to get filled and for a notification, to resume consuming
T elem = buffer.front();
buffer.erase(buffer.begin());
return elem;
}
我有两个线程(生产者/消费者),一个从文件中读取字符串并将它们填充到 BlockingQueue 中,另一个从 BlockingQueue 中删除字符串并打印它们。这两者都是从一个定义如下的类初始化的。
class FileProcessor
{
public:
explicit FileProcessor():bqueue(),inFile("random.txt")
{
rt = boost::thread(boost::bind(&FileVerifier::read, this));
pt1 = boost::thread(boost::bind(&FileVerifier::process, this));
}
volatile ~FileProcessor()
{
rt.interrupt();
pt1.interrupt();
rt.join();
pt1.join();
}
/* Read strings from a file, populate them in the blocking-queue */
void read()
{
std::ifstream file(inFile, std::ios_base::in | std::ios_base::binary);
boost::iostreams::filtering_istream in;
if (file.fail()) {
std::cout << "couldn't open the input file.. please check its name and read permissions\n";
return;
}
try {
in.push(file);
for(std::string inputStr; std::getline(in,inputStr);)
{
bqueue.push(inputStr);
std::cout << "inserted " << inputStr << "\n";
}
}
catch(std::exception& e) {
std::cout << "exception occurred while reading file\n" << e.what() << "\n";
}
}
/* Process the elements (dequeue and print) */
void process()
{
while (true)
{
std::string rstr = bqueue.pop();
std::cout << "consumed " << rstr << "\n";
}
}
private:
boost::mutex mutex;
boost::thread rt;
boost::thread pt1;
BlockingQueue<std::string> bqueue;
std::string inFile; // input file name from where the strings are read
};
我观察到以下输出(仅包括快照):
运行 1:
inserted AZ
inserted yezjAdCeV
inserted icKU
inserted q
inserted b
inserted DRQL
inserted aaOj
inserted CqlNRv
inserted e
inserted XuDemby
inserted rE
inserted YPk
inserted dLd
inserted xb
inserted bSrZdf
inserted sCQiRna
...
运行 4:
consumed jfRnjSxrw
inserted INdmXSCr
consumed oIDlu
inserted FfXdARGu
consumed tAO
inserted mBq
consumed I
inserted aoXNhP
consumed OOAf
inserted Qoi
consumed wCxJXGWJu
inserted WZGYHluTV
consumed oIFOh
inserted kkIoFF
consumed ecAYyjHh
inserted C
consumed KdrBIixw
inserted Ldeyjtxe
...
我的问题:消费者线程有时可以控制队列的资源(能够出列和打印),有时则不能。我不确定为什么会这样。任何有关队列设计缺陷的提示将不胜感激。谢谢!
观察:
当线程未从 (FileProcessor) 类的 ctor 初始化时,它们会按预期运行,即它们会访问 BlockingQueue 资源并执行读/写操作。请参阅下面的 sn-ps,了解为实现此行为所做的更改。
-
生产者-消费者线程不会轮流,正如@n.m 指出的那样,生产者不会明确地屈服于消费者。根据上述观察,它们各自的输出类似于下面给出的输出
inserted DZxcOw consumed inserted DZxcOw consumed robECjOp robECjOp inserted BaILFsVaA inserted HomURR inserted PVjLPb consumed BaILFsVaA consumed HomURR consumed PVjLPb inserted SHdBVSEyU consumed SHdBVSEyU consumed JaEH inserted JaEH inserted g inserted MwEgOVB inserted qlohoszv consumed g consumed MwEgOVB consumed qlohoszv consumed AsQgq inserted AsQgq inserted tbm inserted iriADeEL inserted Zoxs consumed tbm
从类 ctor 外部初始化。
#include <iostream>
#include <threading/file_processor.h> //has the FileProcessor class declaration
int main()
{
FileProcessor fp; //previously, I had only this statement which called the class constructor, from where the threads were initialized.
boost::thread rt(boost::bind(&FileProcessor::read, &fp));
boost::thread pt1(boost::bind(&FileProcessor::process, &fp));
rt.join();
pt1.join();
return 0;
}
修改的 FileProcessor 类(从其 ctor 中删除了线程初始化)
#include <boost/iostreams/filtering_stream.hpp>
#include <threading/blocking_queue.h> //has the BlockingQueue class
using namespace boost::iostreams;
class FileProcessor
{
public:
explicit FileProcessor():bqueue(),inFile("random.txt")
{}
~FileProcessor()
{}
void read()
{
std::ifstream file(inFile, std::ios_base::in | std::ios_base::binary);
filtering_istream in;
if (file.fail()) {
std::cout << "couldn't open the input file.. please check its name and read permissions\n";
return;
}
try {
in.push(file);
for(std::string inputStr; std::getline(in,inputStr);)
{
bqueue.push(inputStr);
std::cout << "inserted " << inputStr << "\n";
}
}
catch(std::exception& e) {
std::cout << "exception occurred while reading file\n" << e.what() << "\n";
}
}
void process()
{
while (true)
{
std::string rstr = bqueue.pop();
std::cout << "consumed " << rstr << "\n";
}
}
private:
BlockingQueue<std::string> bqueue;
std::string inFile; // input file name from where the strings are read
};
编辑:
2017 年 5 月 24 日:删除了不准确的注释“将整个文件内容放入缓冲区”。
【问题讨论】:
-
不清楚你的意思。请描述您观察到的情况。尽量不要做出任何结论,只要告诉我们你看到了什么(程序输出,调试器输出,等等)。 minimal reproducible example 是让其他人无需参与猜谜游戏就能了解您的问题的好方法。
-
你应该考虑
boost::concurrent::sync_queue。 -
生产者/消费者是shared_mutex和读/写锁的经典场景。
-
不能保证线程调度是公平的。生产者线程永远不需要停止并让步给其他线程,因此可以合理地假设它可能永远不会停止并让步给其他线程。尝试使用
boost::thread::yield或限制队列的大小,使其在达到最大大小时必须停止。 -
int size() const { return buffer.size(); }是一个巨大的反模式。确实没有办法在不引起竞争条件的情况下使用它。
标签: c++ multithreading c++11 boost producer-consumer