【问题标题】:lio_listio: How to wait until all requests complete?lio_listio:如何等到所有请求完成?
【发布时间】:2013-01-24 11:55:56
【问题描述】:

在我的 C++ 程序中,我使用 lio_listio 调用一次发送许多(最多几百个)写入请求。之后,我做了一些计算,当我完成后,我需要等待所有未完成的请求完成,然后才能提交下一批请求。我该怎么做?

现在,我只是循环调用aio_suspend,每次调用一个请求,但这看起来很难看。看起来我应该使用struct sigevent *sevp 参数到lio_listio。我目前的猜测是我应该这样做:

  • 在主线程中,创建一个互斥体并在调用lio_listio之前将其锁定。
  • 在对lio_listio 的调用中,指定解锁此互斥体的通知函数/信号处理程序。

这应该会给我想要的行为,但它会可靠地工作吗?是否允许从信号处理程序上下文中操作互斥锁?我读到 pthread 互斥锁可以提供错误检测,如果您尝试从同一个线程再次锁定它们或从不同的线程解锁它们,则 pthread 互斥锁会失败,但此解决方案依赖于死锁。

示例代码,使用信号处理程序:

void notify(int, siginfo_t *info, void *) {
    pthread_mutex_unlock((pthread_mutex_t *) info->si_value);
}

void output() {
    pthread_mutex_t iomutex = PTHREAD_MUTEX_INITIALIZER;

    struct sigaction act;
    memset(&act, 0, sizeof(struct sigaction));
    act.sa_sigaction = &notify;
    act.sa_flags = SA_SIGINFO;
    sigaction(SIGUSR1, &act, NULL);

    for (...) {
        pthread_mutex_lock(&iomutex);

        // do some calculations here...

        struct aiocb *cblist[];
        int cbno;
        // set up the aio request list - omitted

        struct sigevent sev;
        memset(&sev, 0, sizeof(struct sigevent));
        sev.sigev_notify = SIGEV_SIGNAL;
        sev.sigev_signo = SIGUSR1;
        sev.sigev_value.sival_ptr = &iomutex;

        lio_listio(LIO_NOWAIT, cblist, cbno, &sev);
    }

    // ensure that the last queued operation completes
    // before this function returns
    pthread_mutex_lock(&iomutex);
    pthread_mutex_unlock(&iomutex);
}

示例代码,使用通知功能 - 可能效率较低,因为创建了一个额外的线程:

void output() {
    pthread_mutex_t iomutex = PTHREAD_MUTEX_INITIALIZER;

    for (...) {
        pthread_mutex_lock(&iomutex);

        // do some calculations here...

        struct aiocb *cblist[];
        int cbno;
        // set up the aio request list - omitted

        struct sigevent sev;
        memset(&sev, 0, sizeof(struct sigevent));
        sev.sigev_notify = SIGEV_THREAD;
        sev_sigev_notify_function = &pthread_mutex_unlock;
        sev.sigev_value.sival_ptr = &iomutex;

        lio_listio(LIO_NOWAIT, cblist, cbno, &sev);
    }

    // ensure that the last queued operation completes
    // before this function returns
    pthread_mutex_lock(&iomutex);
    pthread_mutex_unlock(&iomutex);
}

【问题讨论】:

    标签: linux posix aio


    【解决方案1】:

    如果您在 lio_listio() 调用中设置了 sigevent 参数,当该特定调用中的所有作业完成时,您将收到一个信号(或函数调用)通知。您仍然需要:

    1. 等到收到与发出 lio_listio() 调用一样多的通知后,才能知道它们何时全部完成。

    2. 使用某种安全机制从信号处理程序与主线程进行通信,可能通过全局变量(可移植)。

    如果您使用的是 linux,我建议您将 eventfd 绑定到您的 sigevent,然后等待。这要灵活得多,因为您不需要涉及信号处理程序。在 BSD(但不是 Mac OS)上,您可以使用 kqueue 在 aiocbs 上等待,而在 solaris/illumos 上,您可以使用端口来获取 aiocb 完成的通知。

    Here's an example如何在linux上使用eventfds:

    附带说明,在使用 lio_listio 发布作业时我会谨慎行事。您不能保证它支持超过2 jobs,并且某些系统对您一次可以发出的数量有非常低的限制。例如,Mac OS 上的默认值为 16。此限制可以定义为 AIO_LISTIO_MAX 宏,但不一定。在这种情况下,您需要调用 sysconf(_SC_AIO_LISTIO_MAX)(请参阅docs)。详情请见lio_listio documentation

    您至少应该检查 lio_listio() 调用中的错误情况。

    此外,您使用互斥锁的解决方案是次优的,因为您将同步 for 循环中的每个循环,并且一次只运行一个(除非它是递归互斥锁,但在这种情况下,它的状态可能已损坏如果您的信号处理程序恰好落在不同的线程上)。

    更合适的原语可能是信号量,它在处理程序中释放,然后(在你的 for 循环之后)获取与你循环相同的次数,调用 lio_listio()。但是,如果可以特定于 linux,我仍然会推荐一个 eventfd。

    【讨论】:

    • 感谢您提供信息丰富的帖子。同步每个循环是我想要的,因为我使用类似于双缓冲区的东西来存储计算数据 - 如果互斥锁解决方案没有其他问题,那么它应该可以满足我的目的。我不知道 lio_listio() 的这些限制 - 我将研究使用 mmaped 文件是否会更好。
    • 是的,在这种情况下,我想你的互斥锁解决方案可以正常工作。我唯一要担心的是在信号处理程序中释放互斥锁的模式,可能在当前在获取锁时被阻塞的线程上(对于信号处理程序安全系统调用的列表:pubs.opengroup.org/onlinepubs/009695399/functions/…) .您在传统上使用信号量的模式中使用互斥锁。
    • 确实,看起来 pthread_mutex_* 函数不是异步信号安全的,而 sem_post 是。
    猜你喜欢
    • 2015-04-17
    • 2015-07-17
    • 1970-01-01
    • 2020-03-17
    • 2020-05-26
    • 2021-09-01
    相关资源
    最近更新 更多