【问题标题】:Reading from multiple nonblocking named pipes in Linux从 Linux 中的多个非阻塞命名管道读取
【发布时间】:2014-01-02 02:10:52
【问题描述】:

基于 stackoverflow 中位于 here 的类似示例, 我有三个命名管道,管道_a、管道_b 和管道_c,它们是从外部进程提供的。我想要一个读取器进程,它输出到控制台,无论写入任何这些管道。

下面的程序是一个一体式c程序,应该以非阻塞的方式读取三个管道,并在其中任何一个管道获得新数据时显示输出。

但是,它不起作用 - 它正在阻塞!如果 pipe_a 得到数据,它会显示它,然后等待新数据到达 pipe_b 等...

select() 应该允许监视多个文件描述符,直到一个准备好,此时我们应该进入管道的读取函数并获取数据。

谁能帮助确定为什么管道的行为就像处于阻塞模式?

/*
 * FIFO example using select.
 *
 * $ mkfifo /tmp/fifo
 * $ clang -Wall -o test ./test.c
 * $ ./test &
 * $ echo 'hello' > /tmp/fifo
 * $ echo 'hello world' > /tmp/fifo
 * $ killall test
 */

#include <sys/types.h>
#include <sys/select.h>
#include <errno.h>
#include <stdlib.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>


// globals
int fd_a, fd_b, fd_c;
int nfd_a, nfd_b, nfd_c;
fd_set set_a, set_b, set_c;
char buffer_a[100*1024];
char buffer_b[100*1024];
char buffer_c[100*1024];


int readPipeA()
{
ssize_t bytes;
size_t total_bytes;

if (FD_ISSET(fd_a, &set_a)) {
    printf("\nDescriptor %d has new data to read.\n", fd_a);
    total_bytes = 0;
    for (;;) {
        printf("\nDropped into read loop\n");
        bytes = read(fd_a, buffer_a, sizeof(buffer_a));
        if (bytes > 0) {
            total_bytes += (size_t)bytes;
            printf("%s", buffer_a);
        } else {
            if (errno == EWOULDBLOCK) {
                printf("\ndone reading (%ul bytes)\n", total_bytes);
                break;
            } else {
                perror("read");
                return EXIT_FAILURE;
            }
        }
    }
}
}

int readPipeB()
{
ssize_t bytes;
size_t total_bytes;

if (FD_ISSET(fd_b, &set_b)) {
    printf("\nDescriptor %d has new data to read.\n", fd_b);
    total_bytes = 0;
    for (;;) {
        printf("\nDropped into read loop\n");
        bytes = read(fd_b, buffer_b, sizeof(buffer_b));
        if (bytes > 0) {
            total_bytes += (size_t)bytes;
            printf("%s", buffer_b);
        } else {
            if (errno == EWOULDBLOCK) {
                printf("\ndone reading (%ul bytes)\n", total_bytes);
                break;
            } else {
                perror("read");
                return EXIT_FAILURE;
            }
        }
    }
}
}

int readPipeC()
{
ssize_t bytes;
size_t total_bytes;

if (FD_ISSET(fd_c, &set_c)) {
    printf("\nDescriptor %d has new data to read.\n", fd_c);
    total_bytes = 0;
    for (;;) {
        printf("\nDropped into read loop\n");
        bytes = read(fd_c, buffer_c, sizeof(buffer_c));
        if (bytes > 0) {
            total_bytes += (size_t)bytes;
            printf("%s", buffer_c);
        } else {
            if (errno == EWOULDBLOCK) {
                printf("\ndone reading (%ul bytes)\n", total_bytes);
                break;
            } else {
                perror("read");
                return EXIT_FAILURE;
            }
        }
    }
}
}


int main(int argc, char* argv[])
    {


    // create pipes to monitor (if they don't already exist)
    system("mkfifo /tmp/PIPE_A");
    system("mkfifo /tmp/PIPE_B");
    system("mkfifo /tmp/PIPE_C");


    // open file descriptors of named pipes to watch
    fd_a = open("/tmp/PIPE_A", O_RDWR | O_NONBLOCK);
    if (fd_a == -1) {
    perror("open");
    return EXIT_FAILURE;
    }
    FD_ZERO(&set_a);
    FD_SET(fd_a, &set_a);


    fd_b = open("/tmp/PIPE_B", O_RDWR | O_NONBLOCK);
    if (fd_b == -1) {
    perror("open");
    return EXIT_FAILURE;
    }
    FD_ZERO(&set_b);
    FD_SET(fd_b, &set_b);


    fd_c = open("/tmp/PIPE_C", O_RDWR | O_NONBLOCK);
    if (fd_c == -1) {
    perror("open");
    return EXIT_FAILURE;
    }
    FD_ZERO(&set_c);
    FD_SET(fd_c, &set_c);



    for(;;)
    {
        // check pipe A
        nfd_a= select(fd_a+1, &set_a, NULL, NULL, NULL);
        if (nfd_a) {
            if (nfd_a == -1) {
                perror("select");
                return EXIT_FAILURE;
            }
            readPipeA();
        }

        // check pipe B
        nfd_b= select(fd_b+1, &set_b, NULL, NULL, NULL);
        if (nfd_b) {
            if (nfd_b == -1) {
                perror("select");
                return EXIT_FAILURE;
            }
            readPipeB();
        }

        // check pipe C
        nfd_c= select(fd_c+1, &set_c, NULL, NULL, NULL);
        if (nfd_c) {
            if (nfd_c == -1) {
                perror("select");
                return EXIT_FAILURE;
            }
            readPipeC();
        }
    }

    return EXIT_SUCCESS;
}

--- 更新代码---

根据此处的反馈修改了应用程序,并阅读了更多内容:

    /*
     * FIFO example using select.
     *
     * $ mkfifo /tmp/fifo
     * $ clang -Wall -o test ./test.c
     * $ ./test &
     * $ echo 'hello' > /tmp/fifo
     * $ echo 'hello world' > /tmp/fifo
     * $ killall test
     */
    
    #include <sys/types.h>
    #include <sys/select.h>
    #include <sys/time.h>
    #include <sys/types.h>
    #include <errno.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <fcntl.h>
    #include <unistd.h>
    
    
    int readPipe(int fd)
    {
        ssize_t bytes;
        size_t total_bytes = 0;
        char buffer[100*1024];
    
        printf("\nDropped into read pipe\n");
        for(;;) {
            bytes = read(fd, buffer, sizeof(buffer));
            if (bytes > 0) {
                total_bytes += (size_t)bytes;
                printf("%s", buffer);
            } else {
                if (errno == EWOULDBLOCK) {
                    printf("\ndone reading (%d bytes)\n", (int)total_bytes);
                    break;
                } else {
                    perror("read");
                    return EXIT_FAILURE;
                }
            }
        }
        return EXIT_SUCCESS;
    }
    
    
    int main(int argc, char* argv[])
    {
        int fd_a, fd_b, fd_c;   // file descriptors for each pipe
        int nfd;                // select() return value
        fd_set read_fds;        // file descriptor read flags
        struct timeval tv;
        tv.tv_sec = 0;
        tv.tv_usec = 0;
    
        // create pipes to monitor (if they don't already exist)
        system("mkfifo /tmp/PIPE_A");
        system("mkfifo /tmp/PIPE_B");
        system("mkfifo /tmp/PIPE_C");
    
        // open file descriptors of named pipes to watch
        fd_a = open("/tmp/PIPE_A", O_RDWR | O_NONBLOCK);
        if (fd_a == -1) {
            perror("open");
            return EXIT_FAILURE;
        }
    
        fd_b = open("/tmp/PIPE_B", O_RDWR | O_NONBLOCK);
        if (fd_b == -1) {
            perror("open");
            return EXIT_FAILURE;
        }
    
        fd_c = open("/tmp/PIPE_C", O_RDWR | O_NONBLOCK);
        if (fd_c == -1) {
            perror("open");
            return EXIT_FAILURE;
        }
    
        FD_ZERO(&read_fds);
        FD_SET(fd_a, &read_fds);  // add pipe to the read descriptor watch list
        FD_SET(fd_b, &read_fds);
        FD_SET(fd_c, &read_fds);
    
        for(;;)
        {
            // check if there is new data in any of the pipes
            nfd = select(fd_a+1, &read_fds, NULL, NULL, &tv);
            if (nfd != 0) {
                if (nfd == -1) {
                    perror("select");
                    return EXIT_FAILURE;
                }
    
                if (FD_ISSET(fd_a, &read_fds)) {
                    readPipe(fd_a);
                }
            }
    
            nfd = select(fd_b+1, &read_fds, NULL, NULL, &tv);
            if (nfd != 0) {
                if (nfd == -1) {
                    perror("select");
                    return EXIT_FAILURE;
                }
    
                if (FD_ISSET(fd_b, &read_fds)){
                    readPipe(fd_b);
                }
            }
            nfd = select(fd_c+1, &read_fds, NULL, NULL, &tv);
            if (nfd != 0) {
                if (nfd == -1) {
                    perror("select");
                    return EXIT_FAILURE;
                }
                if (FD_ISSET(fd_c, &read_fds)){
                    readPipe(fd_c);
                }
            }
    
            usleep(10);
        }
        return EXIT_SUCCESS;
    }

当任何一个受监视管道中有数据等待时,选择返回零 (0) 仍然存在问题?我一定不能正确使用select()fd_isset()。你能看出我做错了什么吗?谢谢。

【问题讨论】:

  • 缩小你的问题。做一个测试用例。这代码太多了。例如,是否只使用两个管道会再次出现问题?那么你的代码将只有这个大小的 66%。
  • 问题是选择功能被阻塞了。我理解 select() 检查标志以查看读取是否“将”阻塞,以便可以决定是否执行读取。管道正在以 RDWR 和 NONBLOCK 模式打开。 select的参数可能需要更改,我接下来会检查。
  • 请阅读我最初的评论。如果唯一的问题是你有一个阻塞选择应该是一个非阻塞管道,你可能会生成一个 10 行的程序来证明这一点。
  • 顺便说一句,这是很多不必要的重复代码!

标签: c++ c linux named-pipes


【解决方案1】:

问题是选择功能被阻塞。我理解 select() 检查标志以查看读取是否“将”阻塞,以便可以决定是否执行读取。管道正在以 RDWR 和 NONBLOCK 模式打开。

您说问题在于 select 函数被阻塞,但继续承认 NONBLOCK 标志只会使 read 阻塞。选择和阅读是两个不同的东西。

O_NONBLOCK 标志会影响 socket(因此会影响您的 read 调用);它确实不会改变select 的行为,它有自己的超时/阻塞语义。

man select 表示两个数字成员都设置为零的 timeout 参数会产生非阻塞轮询,而 NULL 的超时参数可能会导致无限期阻塞:

如果 timeout 参数是一个空指针,那么对 pselect()select() 的调用将无限期阻塞,直到至少一个描述符符合指定的标准。要进行轮询,timeout 参数不应为空指针,而应指向零值 timespec timeval 结构。

(注意页面上方的文字表明,虽然pselect() 采用timespec 结构,但select() 采用timeval 结构;我冒昧地将这个逻辑应用于以上引用。)

因此,在每个select 调用之前构造一个timeval,将其成员设置为零,并将其传递给select

一些注意事项,当我们在这里时:

  1. 理想情况下,您只需要 一个 select 调用,同时检查所有三个文件描述符,然后通过使用 @987654338 检查您的 FD 集来决定从哪些管道到 read @;

  2. 我还建议在循环体的末尾添加一点 usleep,否则当数据不足时,您的程序会非常非常快地旋转。

【讨论】:

  • 我什至不确定这是 OP 的主要问题。他有 3 个独立的 selects 和 3 个独立的集合。它们应该合二为一。
  • @Duck:他们应该这样做,但我不明白这会如何导致他的问题。
  • +1 不妨提一下fd_isset。有趣的小误会。
  • 感谢您的反馈。关于结构(和大小),我试图扩展我在问题顶部引用的管道示例,并且不想通过传递参数使 pipeRead 函数复杂化,因此只是复制它。我为 select 调用添加了带零的 timeval 结构,由于某种未知原因导致 1 秒超时,我稍后会担心(在 OS X 上测试)。我确实看到我在滥用选择,因为我应该只有一个并使用 fd_isset 来确定要读取的管道......接下来会尝试。
  • 我将三组读取标志组合成一组。由于有三个管道,并且每个管道都有自己的句柄,我相信 select 需要为每个单独的管道句柄调用。我对上面显示的代码进行了更新,但它仍然无法正常工作。
【解决方案2】:

这是我读取三个命名管道的工作解决方案。它可以通过几种方式进行优化,但正如它所写的那样,对于需要这样做的其他人来说应该非常清楚:

    #include <sys/types.h>
    #include <sys/select.h>
    #include <sys/time.h>
    #include <sys/types.h>
    #include <errno.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <fcntl.h>
    #include <unistd.h>


    int readPipe(int fd)
    {
        ssize_t bytes;
        size_t total_bytes = 0;
        char buffer[100*1024];

        printf("\nReading pipe descriptor # %d\n",fd);
        for(;;) {
            bytes = read(fd, buffer, sizeof(buffer));
            if (bytes > 0) {
                total_bytes += (size_t)bytes;
                printf("%s", buffer);
            } else {
                if (errno == EWOULDBLOCK) {
                    break;
                } else {
                    perror("read error");
                    return EXIT_FAILURE;
                }
            }
        }
        return EXIT_SUCCESS;
    }


    int main(int argc, char* argv[])
    {
        int fd_a, fd_b, fd_c;   // file descriptors for each pipe
        int nfd;                // select() return value
        fd_set read_fds;        // file descriptor read flags
        struct timeval tv;
        tv.tv_sec = 0;
        tv.tv_usec = 0;

        // create pipes to monitor (if they don't already exist)
        system("mkfifo /tmp/PIPE_A");
        system("mkfifo /tmp/PIPE_B");
        system("mkfifo /tmp/PIPE_C");

        // open file descriptors of named pipes to watch
        fd_a = open("/tmp/PIPE_A", O_RDWR | O_NONBLOCK);
        if (fd_a == -1) {
            perror("open error");
            return EXIT_FAILURE;
        }

        fd_b = open("/tmp/PIPE_B", O_RDWR | O_NONBLOCK);
        if (fd_b == -1) {
            perror("open error");
            return EXIT_FAILURE;
        }

        fd_c = open("/tmp/PIPE_C", O_RDWR | O_NONBLOCK);
        if (fd_c == -1) {
            perror("open error");
            return EXIT_FAILURE;
        }

        for(;;)
        {
            // clear fds read flags
            FD_ZERO(&read_fds);

            // check if there is new data in any of the pipes
            // PIPE_A
            FD_SET(fd_a, &read_fds);
            nfd = select(fd_a+1, &read_fds, NULL, NULL, &tv);
            if (nfd != 0) {
                if (nfd == -1) {
                    perror("select error");
                    return EXIT_FAILURE;
                }
                if (FD_ISSET(fd_a, &read_fds)) {
                    readPipe(fd_a);
                }
            }

            // PIPE_B
            FD_SET(fd_b, &read_fds);
            nfd = select(fd_b+1, &read_fds, NULL, NULL, &tv);
            if (nfd != 0) {
                if (nfd == -1) {
                    perror("select error");
                    return EXIT_FAILURE;
                }
                if (FD_ISSET(fd_b, &read_fds)){
                    readPipe(fd_b);
                }
            }

            // PIPE_C
            FD_SET(fd_c, &read_fds);
            nfd = select(fd_c+1, &read_fds, NULL, NULL, &tv);
            if (nfd != 0) {
                if (nfd == -1) {
                    perror("select error");
                    return EXIT_FAILURE;
                }
                if (FD_ISSET(fd_c, &read_fds)){
                    readPipe(fd_c);
                }
            }

            usleep(100000);
        }
        return EXIT_SUCCESS;
    }

【讨论】:

  • 在 Linux 下运行相同的代码不起作用... O_RDWR 标志返回 FIFO 的意外结果。 Mac 在运行 linux 时处理它但 O_RDONLY
【解决方案3】:

只是为了让您的代码更简单。您不需要三个选择。您可以通过三个调用 FD_SET()、调用 select 和 if nfd &gt; 0 来设置所有空闲文件描述符,检查每个 fd_x with FD_ISSET()

【讨论】:

    【解决方案4】:

    我使用了一个用于套接字编程的 sn-p,但它对命名管道的工作方式应该相同。它应该简单易懂。

    #include <cstdio>
    #include <cstdlib>
    #include <cstring>
    #include <cctype>
    #include <unistd.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <sys/select.h>
    
    int main()
    {
      fd_set readSet, writeSet, exSet;
      struct timeval tv;
      int i;
    
      int fifoFds[3];
    
      //open files or named pipes and put them into fifoFds array
    
      while(1)
      {
    
        FD_ZERO(&readSet);
        FD_ZERO(&writeSet); //not used
        FD_ZERO(&exSet); //not used
    
        int maxfd = -1;
        for(i = 0; i < 3; i++)
        {
          if(maxfd == -1 || fifoFds[i] > maxfd) 
            maxfd = fifoFds[i];
    
          FD_SET(fifoFds[i], &readSet);
        }
    
        tv.tv_sec = 1; //wait 1 second in select, change these as needed
        tv.tv_usec = 0; //this is microseconds
    
        select(maxfd+1, &readSet, &writeSet, &exSet, &tv);
    
        for(i = 0; i < 3; i++)
        {
          if(FD_ISSET(fifoFds[i], &readSet))
          {
            //Read from that fifo now!
          }
        }
    
      }
    
      return 0;
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-01-25
      • 2021-04-15
      • 2016-08-09
      • 1970-01-01
      • 2010-09-15
      • 2013-02-13
      • 1970-01-01
      相关资源
      最近更新 更多