【发布时间】:2016-07-07 23:16:14
【问题描述】:
所以,我正在编写一种类似于示波器的程序,它读取计算机上的串行端口并对这些数据执行 fft 以将其转换为频谱。我遇到了一个问题,虽然我的程序布局分为SerialHandler 类(使用boost::Asio)、FFTHandler 类和main 函数。 SerialHandler 类使用boost::Asio`` async_read_some 函数从端口读取并引发名为HandleOnPortReceive 的事件,然后该事件本身读取数据。
问题是我无法找到一种方法将由另一个线程上的io_service 对象引发的事件处理程序中的数据传递给另一个线程上的FFTHandler 类。有人建议我使用信号量来解决我的问题,但是我对 semaphore.h 的用法几乎一无所知,所以我的实现现在已经很糟糕了,并没有做任何应该做的事情。
下面是一些代码,如果这样可以让它更清晰一点:
using namespace Foo;
//main function
int main(void){
SerialHandler serialHandler;
FFTHandler fftHandler;
sem_t *qSem_ptr = &qSem;
sem_init(qSem_ptr, 1, 0);
//create separate threads for both the io_service and the AppendIn so that neither will block the user input statement following
serialHandler.StartConnection(tempInt, tempString); //these args are defined, but for brevity's sake, I ommitted the declaration
t2= new boost::thread(boost::bind(&FFTHandler::AppendIn, &fftHandler, q, qSem));
//allow the user to stop the program and avoid the problem of an infinite loop blocking the program
char inChar = getchar();
if (inChar) {...some logic to stop reading}
}
namespace Foo{
boost::thread *t1;
boost::thread *t2;
sem_t qSem;
std::queue<double> q;
boost::mutex mutex_;
class SerialHandler{
private:
char *rawBuffer; //array to hold incoming data
boost::asio::io_service ioService;
boost::asio::serial_port_ptr serialPort;
public:
void SerialHandler::StartConnection(int _baudRate, string _comPort){
//some functionality to open the port that is irrelevant to the question goes here
AsyncReadSome(); //starts the read loop
//create thread for io_service object and let function go out of scope
t1 = new boost::thread(boost::bind(&boost::asio::io_service::run, &ioService));
}
void SerialHandler::AsyncReadSome(){
//there's some other stuff here for error_catching, but this is the only important part
serialPort->async_read_some (
boost::asio::buffer(rawBuffer, SERIAL_PORT_READ_BUF_SIZE),
boost::bind(
&SerialHandler::HandlePortOnReceive,
this, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred, q));
}
void SerialHandler::HandlePortOnReceive(const boost::system::error_code& error, size_t bytes_transferred, std::queue<double>& q){
boost::mutex::scoped_lock lock(mutex_);
//more error checking goes here, but I've made sure they aren't returning and are not the issue
for (unsigned int i =0; i<bytes_transferred; i++){
unsigned char c = rawBuffer[i];
double d = (double) c; //loop through buffer and read
if (c==endOfLineChar){
} else //if not delimiting char, push into queue and post semaphore
{
q.push(d);
//cout << d << endl;
sem_post(&qSem);
cout << q.front() << endl;
cout << "size is: " << q.size() << endl;
}
}
//loop back on itself and start the next read
AsyncReadSome();
}
}
class FFTHandler{
private:
double *in; //array to hold inputs
fftw_complex *out; //holds outputs
int currentIndex;
bool filled;
const int N;
public:
void AppendIn(std::queue<double> &q, sem_t &qSem){
while(1){ //this is supposed to stop thread from exiting and going out of scope...it doesn't do that at all effectively...
cout << "test" << endl;
sem_wait(&_qSem); //wait for data...this is blocking but I don't know why
double d = _q.front();
_q.pop();
in[currentIndex]=d; //read queue, pop, then append in array
currentIndex++;
if (currentIndex == N){ //run FFT if full and reset index
currentIndex = N-overlap-1;
filled = true;
RunFFT();
}
}
}
}
}
FFTHandler::AppendIn(..) 中的调试行确实正在触发,因此正在创建线程,但它似乎立即超出范围并破坏了线程,因为似乎我已经设置了一段时间以错误地响应信号。
TLDR:这是一个很长的解释,简单地说,“我不了解信号量,但需要以某种方式实现它们。我尝试过,失败了,所以现在我来这里希望能收到请比我知识渊博的人帮忙处理这段代码。
更新:所以在玩弄了一些调试语句之后,似乎问题在于while(1){...} 语句确实在触发,但是sem_wait(&_qSem); 导致它阻塞。无论出于何种原因,它都会无限期地等待,尽管信号量正在发布,但它会继续等待并且永远不会超出该线。
【问题讨论】:
-
我看不到 信号量使用 有什么特别的问题,但我可以看到线程的潜在问题 - 例如,
main是否在线程完成之前退出? (从你刚才说的代码中不清楚“......停止阅读的一些逻辑”) -
为什么 SerialHandler 和 FFTHandler 在不同的线程上运行?如果目的是将工作传递给另一个线程以便 SerialHandler 可以接收下一组数据,那么您可能需要考虑领导者/跟随者模式并拥有一个运行 io_service 的线程池。在这里,每个线程都将使用 SerialHandler 读取数据并使用 FFTHandler 处理数据,一旦完成,将返回池中等待下一个 io 事件。
-
@aichao 我让 SerialHandler io_service 在一个单独的线程上运行,因为它在处理程序中循环回自身,因此它创建了一个无限循环。但是,我希望能够停止程序,并且不可停止的无限循环并不是一个理想的功能,所以我将它移到另一个线程,以便能够接受用户输入,最终在用户喜欢它时停止这个循环。出于类似的原因,
FFTHandler::AppendIn(...)函数位于其自己的线程上,即不阻塞主线程并释放用户以在程序运行时执行操作(即停止程序)。 -
不要将
boost::bind与boost::thread一起使用,这是多余的。只需将参数直接传递给boost::thread构造函数。 -
@imp903 好的,我想我知道你现在想要做什么。我的评论的重点是,您可以将 io_service 用作事件队列(而不仅仅是 I/O 事件队列)并让多个线程处理事件,包括您现在拥有的 I/O 事件以及“队列已满” FFT”事件,然后由 FFTHandler 处理。单独的 I/O 和 FFT 事件可以在单独的线程上同时发生,因为一个线程读取更多数据,而另一个线程正在处理前一组数据;都没有阻塞主线程,这是你的意图。如果有兴趣,我可以发布答案。
标签: c++ multithreading boost-asio semaphore