【问题标题】:Message Routing, Partitioning and Parallel Processing消息路由、分区和并行处理
【发布时间】:2019-05-16 11:13:01
【问题描述】:

我正在尝试解决一组传入消息的分区和并行化问题。我想根据用户 id 进行分区,用户 id 是一个整数。

所以假设我有十万用户并且我想要 X 个执行线程或分区,我希望能够创建具有分区规则的 X 个线程,例如 userid 100000

如何实现一种算法来验证 C#(或任何 .NET 语言,如 F#)中的分区规则?

所以如果我有 10 个 id 为 (1,2,3,4,5,6,7,8,9,10) 的用户,那么线程总数必须只有 10 个用户 id,并且没有重复的用户 id超过 1 个线程

感谢您迄今为止的回复,但我想澄清一下,我并没有考虑在所有现有用户之间平均分配。相反,我希望能够创建一个分区规则表来定义应该如何进行分区。该表可能如下所示:

Thread 1: UserId > 0 < 100
Thread 2: UserId >= 100 < 200
Thread 3: UserId > 300

我想要的是一种算法,可以验证这些规则集是否详尽(涵盖所有用户 ID)并且不包含重复项(不会将用户 ID 分配给多个线程)

【问题讨论】:

  • Parallel.ForEachParallel.For 已经对输入数据进行了分区。分区可以同时处理fixed and dynamic sizes, and use load balancing - 即,如果一个分区最终进行了大量“繁重”处理,则其余数据可以转到其他工作任务。
  • 您的实际问题是什么? How can I implement an algorithm to validate the partitioning rules 应该很容易 - 对数据进行分区并检查每个分区的内容是否违反规则。如果问题是如何确保线程获得正确的分区,什么线程?您还没有解释如何处理数据。使用 Parallel.ForEach?普林克?你自己的代码?还有什么?
  • 您是否尝试将消息路由到具有不同优先级的不同工作人员?这不是分区。
  • @PanagiotisKanavos,你是对的。这包括优先级和消息路由。我可能搞砸了术语。我已经编辑了标题以增加清晰度。

标签: c# .net f# set-theory


【解决方案1】:

我认为实现这种分区逻辑的最简单方法是通过模除法进行分区。 IE。如果您有 n 个分区来分配您的用户 - 只需将 UserId 除以 n 即可。这将为您提供 0..(n-1) 的余数,并且通过该余数,每个 UserId 都会自动映射到您的 n 个分区(线程)之一。

如果 UserId 是均匀分布的,那么这种分区方案甚至还会在统计上将所有用户均匀分布在现有分区上。

而且这种分配方案根据定义是详尽无遗的,因为这样的模除法只能剩下 0..(n-1),并且每个 UserId 都有一个这样的模除法的定义结果。

【讨论】:

    【解决方案2】:

    下面的代码将根据您有多少消息以及您想要多少组将消息列表划分为组。

    private static IEnumerable<IEnumerable<Message>> GetPartitionedMessages(IEnumerable<Message> messages, int nPartitions)
    {
        var orderedMessages = messages.OrderBy(x => x.UserId).ThenBy(x => x.MessageId).ToList();
        int? lastUserId = null;
        int maxPartitionSize = (int)Math.Ceiling(orderedMessages.Count / (double)nPartitions);
        var partitions = new List<List<Message>>();
        List<Message> currentPartition = null;
    
        foreach (var message in orderedMessages)
        {
            if (lastUserId == message.UserId)
            {
                currentPartition.Add(message);
            }
            else
            {
                lastUserId = message.UserId;
                if (currentPartition == null || currentPartition.Count >= maxPartitionSize)
                {
                    currentPartition = new List<Message>();
                    partitions.Add(currentPartition);
                }
    
                currentPartition.Add(message);
            }
        }
    
        return partitions;
    }
    

    此示例应用程序使用该函数,然后对每条消息执行模拟处理函数。每个组(即每个线程将处理的列表)永远不会包含来自另一个列表的用户 ID。示例应用程序向控制台打印一条消息,其中包含哪个线程正在处理哪个消息以及每条消息的 ID 和用户 ID 是什么。

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading.Tasks;
    
    namespace ConsoleApp16
    {
        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("User IDs 1, 2, 3");
                ProcessMessages(GetTestMessages(1, 2, 3), 4);
    
                Console.WriteLine("User IDs empty");
                ProcessMessages(GetTestMessages(), 4);
    
                Console.WriteLine("User IDs 1, 2, 3, 4, 5, 6, 7, 8, 9, 10");
                ProcessMessages(GetTestMessages(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 4);
    
                Console.WriteLine("User IDs 2, 2, 2, 1, 1, 4, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 6, 7, 8, 9, 10");
                ProcessMessages(GetTestMessages(2, 2, 2, 1, 1, 4, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 6, 7, 8, 9, 10), 4);
    
                Console.ReadLine();
            }
    
            private static IEnumerable<Message> GetTestMessages(params int[] userIds)
            {
                int i = 1;
                foreach (var userId in userIds)
                    yield return new Message { MessageId = i++, UserId = userId };
            }
    
            private class Message
            {
                public int MessageId { get; set; }
                public int UserId { get; set; }
                //... Real message properties
            }
    
            private static void ProcessMessages(IEnumerable<Message> incomingMessages, int nThreads)
            {
                var tasks = GetPartitionedMessages(incomingMessages, nThreads)
                             .Select((messages, i) => Task.Run(() => DoMessageBusinessLogic(messages, i)))
                             .ToArray();
                Task.WaitAll(tasks);
            }
    
            private static void DoMessageBusinessLogic(IEnumerable<Message> messages, int threadIdx)
            {
                foreach (var message in messages)
                    Console.WriteLine($"Thread ID: {threadIdx}, MsgId: {message.MessageId}, UserId: {message.UserId}");
            }
    
            private static IEnumerable<IEnumerable<Message>> GetPartitionedMessages(IEnumerable<Message> messages, int nPartitions)
            {
                var orderedMessages = messages.OrderBy(x => x.UserId).ThenBy(x => x.MessageId).ToList();
                int? lastUserId = null;
                int maxPartitionSize = (int)Math.Ceiling(orderedMessages.Count / (double)nPartitions);
                var partitions = new List<List<Message>>();
                List<Message> currentPartition = null;
    
                foreach (var message in orderedMessages)
                {
                    if (lastUserId == message.UserId)
                    {
                        currentPartition.Add(message);
                    }
                    else
                    {
                        lastUserId = message.UserId;
                        if (currentPartition == null || currentPartition.Count >= maxPartitionSize)
                        {
                            currentPartition = new List<Message>();
                            partitions.Add(currentPartition);
                        }
    
                        currentPartition.Add(message);
                    }
                }
    
                return partitions;
            }
        }
    }
    

    【讨论】:

    • 看起来 op 的要求有所不同。问题似乎是“如何验证我的分区规则”
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-06-04
    • 2016-04-18
    • 1970-01-01
    • 2010-10-12
    • 2014-12-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多