【问题标题】:Multi-Producer Multi-Consumer data synchronization with separated queues多生产者多消费者数据同步与分离队列
【发布时间】:2019-08-05 12:55:06
【问题描述】:

我有以下场景:

  1. 可变数量(大于三个)的队列(取决于文件中设置的配置)
  2. 其中一些队列可以接收数据或不接收数据(这取决于通过网络客户端接收数据的生产者:客户端可以在同一会话期间连接或不连接)
  3. 这些队列以不同的速度提供;因此,例如,Queue1 在给定时间可以有 10 个对象,而另一个队列 Queue2 在同一给定时间只能有 3 个对象
  4. 这些队列中的对象必须根据它们共享的属性(一个不断增加的名为“SSId”的 int 属性)进行同步
  5. 必须仅对在给定时刻提供数据的队列进行同步(必须排除未连接的队列)
  6. 当对象同步时,它们必须被推送到相关消费者使用的相应输出队列:每个生产者都与特定消费者相关联
  7. 在上一步之后,每个消费者能够同时处理具有相同属性值的“SSId”的入队对象;
  8. 因此,最终结果应该是一个系统,即使每个生产者以不同的速度/速率生成数据,消费者也能够以相同的速率处理数据(根据已经提到的“SSId”属性同步)

为了给出更清晰的概念,有一个模式表示前面所述的流程:

请注意,SSid 大于 100 的新项目不会被推送到消费者队列中,因为其他队列中还没有对应的项目。

您能否建议一种使用 .NET TPL Dataflow 或 Rx.NET 创建这种同步的方法?到目前为止,我一直使用 TPL Dataflow 来实现简单的顺序管道,并且我希望得到有关如何处理此场景的反馈。 提前感谢您的任何建议。

【问题讨论】:

  • 什么触发了同步操作?如何检测未连接的生产者?
  • 同步操作的目的是在生产者开始填充队列时发生:每当所有连接的队列都被一个具有相同 SSId 的项目填充时,这些项目应该被发送到消费者队列。通过每次生产者断开连接(或再次连接)时引发的事件来检测未连接的生产者
  • ID 是否一直在增加?
  • 是的,在我们的例子中,ID 被定义为 long 并且不断增加一:1、2、3、...

标签: task-parallel-library reactive-programming system.reactive tpl-dataflow rx.net


【解决方案1】:

怎么样

  1. 将所有生产者的对象合并到一个可观察对象中
  2. 按 SSId 对对象进行分组
  3. 当组大小等于生产者数量时(通过 .Buffer())发出组

像这样:

var syncedProducers = 
    // ConnectedProducersEvent ticks an array of connected producers, each time a producer connects or disconnects
    ConnectedProducersEvent
        .SelectMany(producers => 
            Observable
                .Merge(producers) // Put all objects, from all producers into the same observable
                .GroupBy(@object => @object.SSId) // Group objects by matching SSId
            .SelectMany(group => group.Buffer(producers.Length))); // Syncing: Emit the SSId group, when the group count matches the count of connected producers

// Now you can wire syncedProducers to consumers
var consumer1 = 
    syncedProducers
        .Select(x => x.Where(y => y.Producer == 1));

You can run the example on dotnetfiddle

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-02-11
    • 2013-04-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多