【问题标题】:Canonical way to "broadcast" data to multiple processes in linux?将数据“广播”到Linux中的多个进程的规范方法?
【发布时间】:2016-03-26 04:57:39
【问题描述】:

我有一个应用程序需要将数据流从一个进程发送到多个读取器,每个读取器都需要查看自己的流副本。这是相当高的速率(100MB/s 并不少见),所以我想尽可能避免重复。在我的理想世界中,linux 会命名管道支持多个读取器,并为常见的单读取器情况提供快速路径。

我想要一些能够提供某种命名空间隔离措施的东西(例如:127.0.0.1 上的广播对我认为的任何进程都是开放的......)。 Unix域套接字不支持广播,无论如何UDP都是“不可靠的”(在我的情况下,服务器将丢弃数据包而不是阻塞)。我想我可以创建一个共享内存段并将公共缓冲区存储在那里,但这感觉就像重新发明轮子一样。在 linux 中有没有规范的方法可以做到这一点?

【问题讨论】:

  • 某种“发布订阅”机制?有很多可供选择。
  • 广播对你毫无帮助。您不能广播到同一个盒子上的多个端口。共享内存将是最快的,但挑战将是“何时删除数据”。那里有几种策略。

标签: linux sockets


【解决方案1】:

我想我可以创建一个共享内存段并将公共缓冲区存储在那里,但这感觉就像重新发明轮子一样。在 linux 中有没有规范的方法可以做到这一点?

简短的回答:

长答案:是的 [而且你在正确的轨道上]

我以前 [为了更高的速度] 必须这样做,所以我不得不对此进行研究。以下是我想出的。

在主进程中,创建一个共享缓冲区池[使用您选择的 SysV shm 或私有 mmap]。为他们分配 ID 号(例如 1、2、3、...)。现在有一个从bufid 到缓冲内存地址的映射。要使子进程可以访问它,请在分叉它们之前执行此操作。孩子也继承了共享内存映射,所以工作不多

现在 fork 孩子们。给他们每个人一个唯一的进程ID。您可以从一个数字开始逐步增加:2,3,4,... [main is 1] 或只使用常规 pid。

打开一个 SysV 消息通道 (msgget et. al.)。同样,如果您在分叉之前的主进程中执行此操作,则它们可供子 [IIRC] 使用。


下面是它的工作原理:

main 找到一个未使用的缓冲区并填充它。对于每个孩子,main 通过msgsnd(在单个公共 IPC 通道上)发送 IPC 消息,其中消息有效负载 [mtext] 是 bufid 号。每条消息都将标准标头的 mtype 字段设置为目标子进程的 pid。

完成此操作后,main 将缓冲区记住为“正在运行”且尚未可重用。

每个孩子都会执行一个msgrcv,并将mtype 设置为其pid。然后它从mtext 中提取bufid 并处理缓冲区。完成后,它会发送一条 IPC 消息 [再次在同一频道上],并将 mtype 设置为 main 的 pid,并带有刚刚处理的 bufid 的 mtext

main 的循环执行非阻塞msgrcv,记录给定 bufid 的所有“释放”消息。当所有孩子都释放了缓冲区时,它被放回缓冲区“空闲队列”中。在 main 的服务循环中,它可能会填充新的缓冲区并酌情发送更多消息[穿插在等待中]。

然后孩子执行 msgrcv 并重复循环。

因此,我们正在使用 [大] 共享内存缓冲区和短 [几个字节] bufid 描述符 IPC 消息。


好的,那么您可能会问的问题是:“为什么 SysV IPC 用于通信频道?” [vs.多个管道或套接字]。

您已经知道共享缓冲区可以避免发送数据的多个副本。

所以,这就是要走的路。但是,为什么不通过套接字或管道 [或共享队列、条件变量、互斥锁等] 发送上述 bufid 消息?

答案是速度和目标进程的唤醒特性。

对于高度实时的响应,当 main 发出 bufid 消息时,您希望目标进程 [如果它一直在休眠]立即唤醒并开始处理缓冲区。

我检查了 linux 内核源代码,唯一 具有该特性的机制是 SysV IPC。所有其他人都有[调度]滞后。

当进程 A 在进程 B 已执行 msgrcv 的通道上执行 msgsnd 时,将发生三件事:

  1. 进程 B 将被调度程序标记为可运行。
  2. [IIRC] B 将被移到其调度队列的前面
  3. 此外,更重要的是,这会导致立即重新安排所有进程。

B 将立即启动 [与下一个定时器中断或其他进程恰好休眠相反]。在单核机器上,A 将进入睡眠状态,B 将代替它运行。

警告:我所有的研究都是在 CFS 调度程序之前几年完成的,但是,我相信上述内容应该仍然成立。另外,我使用的是 RT 调度程序,如果 CFS 无法按预期工作,这可能是一个可能的选择。


更新:

查看 POSIX 消息队列源,我认为您在 System V 队列中讨论的相同立即唤醒行为正在发生,这带来了 POSIX 兼容性的额外好处。

时间语义是可能的[并且是可取的],所以我不会感到惊讶。但是,SysV 实际上比 POSIX mqueues 更标准、更普遍。而且,存在一些语义差异[见下文]。

对于计时,您可以构建一个带有 nsec 时间戳的单元测试程序 [仅使用 msgs]。我使用了 TSC 邮票,但clock_gettime(CLOCK_REALTIME,...) 也可能有用。戳出发时间和到达/起床时间查看。比较 SysV 和 mq

使用 SysV 或 mq,您可能需要通过 /proc/* 提高最大消息数量、最大消息大小、最大队列数量。默认值相对较小。如果你不这样做,你可能会发现任务被阻塞等待一个 msg,但由于超出了 msg 队列最大参数,master 无法发送一个 [被阻塞]。我实际上有这样一个错误,所以我更改了我的代码以在启动期间提高这些值 [它以 root 身份运行]。因此,您可能需要将其作为 RC 启动脚本(或任何 [atrocious ;-)] systemd 等效项)

我查看了在我自己的代码中使用 mq 替换 SysV。对于多对一返回免费池消息,它没有相同的语义。在我最初的回答中,我忘了提到需要 两个 msg 队列:master-to-children(例如工作待办事项)和 children-to-master(例如返回一个现在可用的缓冲区)。

我有几种不同类型的缓冲区(例如压缩视频、压缩音频、未压缩视频、未压缩音频),它们具有不同的类型和结构描述符。

此外,多个不同的缓冲区队列,因为这些缓冲区从线程传递到线程[不同的处理阶段]。

使用 SysV,您可以将单个 msg 队列用于多个缓冲区列表/队列,缓冲区列表 ID 是 msg mtype。一个子 msgrcv 等待将 mtype 设置为 ID 值。 master 等待返回到空闲的 msg 队列,mtype 为 0。

mq* 要求每个 ID 有一个单独的 mqd_t,因为它不允许等待 msg 子类型。

msgrcv 允许 IPC_NOWAIT 在每个呼叫中​​使用,但要获得与 mq_receive 相同的效果,您必须使用 O_NONBLOCK 打开队列或使用定时版本。这在“关闭”或“重新启动”阶段使用(例如,向孩子发送一条消息,表示不会有更多数据到达,他们应该终止[或重新配置等])。 IPC_NOWAIT 可方便地在程序启动期间“排出”队列[以消除先前调用中的陈旧消息]或在操作期间从先前配置中排出陈旧消息。

因此,您需要为每个缓冲区列表/类型设置一个单独的mqd_t,而不是只使用两个 SysV 消息队列来处理任意数量的缓冲区列表。

【讨论】:

  • 这很有趣,我得进一步研究一下,看看它是否符合我的要求。
  • 该方法在 3 个商业产品中提供,用于广播质量高清实时 H.264 编码/解码。这些对正常运行时间有很高的要求(例如一年的 MTBF 和 300 毫秒或更短的重启/恢复时间)。他们还必须在数据阶段之间具有低延迟。
  • 查看 POSIX 消息队列源:lxr.free-electrons.com/source/ipc/mqueue.c认为您与 System V 队列讨论的相同立即唤醒行为正在发生,这给出了添加的POSIX 兼容性的好处。
  • 这种方法的缺点是崩溃或行为不端的进程可能会导致缓冲区永远不会被释放。
猜你喜欢
  • 2022-08-17
  • 2017-11-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-05-23
  • 1970-01-01
  • 2014-11-29
  • 1970-01-01
相关资源
最近更新 更多