【问题标题】:Consumer/Producer with order and constraint on consumed items消费者/生产者对消费项目有订单和约束
【发布时间】:2012-12-11 11:32:34
【问题描述】:

我有以下场景

  • 我正在编写一个处理文件(作业)的服务器
    • 一个文件有一个“前缀”和一个时间
    • 文件要按时间处理(旧文件优先)但也要考虑前缀(不能同时处理具有相同前缀的文件)
  • 我有一个线程(带定时器的任务)监视目录并将文件添加到“队列”(生产者)
  • 我有几个从“队列”(消费者)获取文件的消费者——他们应该符合上述规则。
    • 每个任务的作业都保存在某个列表中(这表明了约束)
  • 有多个消费者,消费者数量在启动时确定。

其中一个要求是能够优雅地停止消费者(立即或让正在进行的进程完成)。

我沿着这条线做了一些事情:

while (processing)
{
    //limits number of concurrent tasks
    _processingSemaphore.Wait(queueCancellationToken);  
    //Take next job when available or wait for cancel signal
    currentwork = workQueue.Take(taskCancellationToken);

    //check that it can actually process this work
    if (CanProcess(currnetWork)
    { 
        var task = CreateTask(currentwork)
        task.ContinueWith((t) => { //release processing slot });
    }
    else
       //release slot, return job? something else?
 }

取消标记源位于调用方代码中,可以取消。有两个是为了能够在不取消正在运行的任务的同时停止排队。

我厌倦了将“队列”实现为包装“安全”SortedSet 的 BlockingCollection。除了我需要找到与约束匹配的新工作的情况外,一般的想法工作(按时间排序)。如果我将工作返回队列并尝试再次接受,我将得到相同的。

可以从队列中取出作业,直到我找到合适的作业,然后返回“非法”作业,但这可能会导致其他消费者处理乱序作业时出现问题

另一种选择是传递一个简单的集合和一种锁定它的方法,然后根据当前的约束锁定并进行简单的搜索。同样,这意味着编写可能不是线程安全的代码。

还有其他可以提供帮助的建议/指针/数据结构吗?

【问题讨论】:

  • 最早可以处理此类作业的时间是第一个作业完成时。所以工人也可以处理下一个。因此,将作业列表放入队列中,而不仅仅是作业,列表中的每个作业都有相同的前缀。
  • @HansPassant - 我考虑过 - 有一些每个进程的本地队列,但不确定这不是 100% 正确 - 拥有相同的消费者队列下一个具有相同前缀的作业可能会违反处理方式-时间要求 - 我需要考虑一下,看看它是否适用于所有约束。无论如何-感谢您的意见。
  • @HansPassant - 你指出的确实是正确的。我最终在每个“专业”消费者中实现了一个本地队列,并且需要在他们的队列上“本地”排队额外的作业。如果消费者“错过”了排队的文件(例如,在消费者存在处理循环后完成的时间之间添加了作业),它将返回到一般队列并由任何可用的消费者处理。我可以将评论标记为答案吗?
  • @TomerCagan 请看我编辑的answer。希望它有助于简化您的解决方案。

标签: c#-4.0 concurrency task-parallel-library concurrent-programming


【解决方案1】:

我认为 Hans 是对的:如果你已经有一个线程安全的 SortedSet(它实现了IProducerConsumerCollection,所以它可以在BlockingCollection 中使用),那么你只需要放置可以正确处理的文件现在进入收藏。如果您完成的文件使另一个文件可用于处理,请在此时将另一个文件添加到集合中,而不是更早。

【讨论】:

    【解决方案2】:

    我会用TPL Dataflow 实现您的要求。看看你可以用它实现Producer-Consumer pattern 的方式。我相信这将满足您的所有要求(包括对消费者的取消)。

    编辑(适合那些不喜欢阅读文档,但喜欢阅读的人...)

    这是一个示例,说明如何使用 TPL Dataflow 实现要求。这种实现的美妙之处在于,消费者没有绑定到单个线程,仅在需要处理数据时才使用池线程。

        static void Main(string[] args)
        {
            BufferBlock<string> source = new BufferBlock<string>();
            var cancellation = new CancellationTokenSource();
            LinkConsumer(source, "A", cancellation.Token);
            LinkConsumer(source, "B", cancellation.Token);
            LinkConsumer(source, "C", cancellation.Token);
    
            // Link an action that will process source values that are not processed by other 
            source.LinkTo(new ActionBlock<string>((s) => Console.WriteLine("Default action")));
    
            while (cancellation.IsCancellationRequested == false)
            {
                ConsoleKey key = Console.ReadKey(true).Key;
                switch (key)
                {
                    case ConsoleKey.Escape:
                        cancellation.Cancel();
                        break;
                    default:
                        Console.WriteLine("Posted value {0} on thread {1}.", key, Thread.CurrentThread.ManagedThreadId);
                        source.Post(key.ToString());
                        break;
                }
            }
    
            source.Complete();
            Console.WriteLine("Done.");
            Console.ReadLine();
        }
    
        private static void LinkConsumer(ISourceBlock<string> source, string prefix, CancellationToken token)
        {
            // Link a consumer that will buffer and process all input of the specified prefix
            var consumer = new ActionBlock<string>(new Action<string>(Process), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1, SingleProducerConstrained = true, CancellationToken = token, TaskScheduler = TaskScheduler.Default });
            var linkDisposable = source.LinkTo(consumer, (p) => p == prefix);
    
            // Dispose the link (remove the link) when cancellation is requested.
            token.Register(linkDisposable.Dispose);
        }
    
        private static void Process(string arg)
        {
            Console.WriteLine("Processed value {0} in thread {1}", arg, Thread.CurrentThread.ManagedThreadId);
    
            // Simulate work
            Thread.Sleep(500);
        }
    

    【讨论】:

    • 这并没有解释如何以任何方式解决问题。
    • @svick 在问题的最后被问到“任何其他可以提供帮助的建议/指针/数据结构?”这正是我给他的。使用 Dataflow 满足要求,但前提是您需要阅读我提供的文档链接。在我看来,答案不应该灌输或重新记录任何东西,而应该只提供指导或解决未记录的问题。
    • 但是 TPL 数据流不包含任何可以直接解决问题的内容。这不是一个简单的生产者-消费者场景,因此您的链接本身不会很有帮助。
    • @svick 实际上确实解决了问题(并且更有效)。抱歉回复晚了,但我不喜欢在假期工作。请查看我的编辑。
    • 但是如果你这样做并且有类似aaaaab的东西,那么b将不会被处理而前四个as正在被处理,所以它可以非常无效。这就是为什么我说“使用 TPL 数据流”不是很有帮助:它没有说明您应该如何使用它。回答这个问题并非易事。
    猜你喜欢
    • 1970-01-01
    • 2017-02-24
    • 2015-04-05
    • 1970-01-01
    • 2011-11-08
    • 1970-01-01
    • 2011-07-23
    • 2020-09-21
    • 1970-01-01
    相关资源
    最近更新 更多