【发布时间】:2019-08-05 12:55:06
【问题描述】:
我有以下场景:
- 可变数量(大于三个)的队列(取决于文件中设置的配置)
- 其中一些队列可以接收数据或不接收数据(这取决于通过网络客户端接收数据的生产者:客户端可以在同一会话期间连接或不连接)
- 这些队列以不同的速度提供;因此,例如,Queue1 在给定时间可以有 10 个对象,而另一个队列 Queue2 在同一给定时间只能有 3 个对象
- 这些队列中的对象必须根据它们共享的属性(一个不断增加的名为“SSId”的 int 属性)进行同步
- 必须仅对在给定时刻提供数据的队列进行同步(必须排除未连接的队列)
- 当对象同步时,它们必须被推送到相关消费者使用的相应输出队列:每个生产者都与特定消费者相关联
- 在上一步之后,每个消费者能够同时处理具有相同属性值的“SSId”的入队对象;
- 因此,最终结果应该是一个系统,即使每个生产者以不同的速度/速率生成数据,消费者也能够以相同的速率处理数据(根据已经提到的“SSId”属性同步)
请注意,SSid 大于 100 的新项目不会被推送到消费者队列中,因为其他队列中还没有对应的项目。
您能否建议一种使用 .NET TPL Dataflow 或 Rx.NET 创建这种同步的方法?到目前为止,我一直使用 TPL Dataflow 来实现简单的顺序管道,并且我希望得到有关如何处理此场景的反馈。 提前感谢您的任何建议。
【问题讨论】:
-
什么触发了同步操作?如何检测未连接的生产者?
-
同步操作的目的是在生产者开始填充队列时发生:每当所有连接的队列都被一个具有相同 SSId 的项目填充时,这些项目应该被发送到消费者队列。通过每次生产者断开连接(或再次连接)时引发的事件来检测未连接的生产者
-
ID 是否一直在增加?
-
是的,在我们的例子中,ID 被定义为 long 并且不断增加一:1、2、3、...
标签: task-parallel-library reactive-programming system.reactive tpl-dataflow rx.net