【问题标题】:Possibility of Semaphoreslim.Wait(0) (to prevent multiple execution) causing non executionSemaphoreslim.Wait(0)(防止多次执行)导致不执行的可能性
【发布时间】:2015-05-15 19:52:56
【问题描述】:

我不确定的情况涉及“线程安全”管道流的使用,其中多个线程可以添加要写入的消息。如果没有要写入的消息队列,则当前线程将开始写入读取方。如果有队列,并且在管道写入时队列增长,我希望开始写入的线程耗尽队列。

我“希望”这种设计(如下所示)阻止 SemaphoreSlim 的连续进入/释放并减少计划任务的数量。我说“希望”是因为我应该测试这种并发症是否对性能有任何积极影响。但是,在进行测试之前,我应该首先了解代码是否按照我的想法执行,所以请考虑以下类,以及它下面的一系列事件;

注意:我了解任务的执行与任何特定线程无关,但我发现这是最简单的解释方式。

class SemaphoreExample
{
    // Wrapper around a NamedPipeClientStream
    private readonly MessagePipeClient m_pipe =
        new MessagePipeClient("somePipe");

    private readonly SemaphoreSlim m_semaphore =
        new SemaphoreSlim(1, 1);

    private readonly BlockingCollection<Message> m_messages =
        new BlockingCollection<Message>(new ConcurrentQueue<Message>());

    public Task Send<T>(T content)
        where T : class
    {
        if (!this.m_messages.TryAdd(new Message<T>(content)))
            throw new InvalidOperationException("No more requests!");

        Task dequeue = TryDequeue();

        return Task.FromResult(true);
        // In reality this class (and method) is more complex.
        // There is a similiar pipe (and wrkr) in the other direction.
        // The "sent jobs" is kept in a dictionary and this method
        // returns a task belonging to a completionsource tied
        // to the "sent job". The wrkr responsible for the other
        // pipe reads a response and sets the corresponding
        // completionsource.
    }

    private async Task TryDequeue()
    {
        if (!this.m_semaphore.Wait(0))
            return; // someone else is already here

        try
        {
            Message message;
            while (this.m_messages.TryTake(out message))
            {
                await this.m_pipe.WriteAsync(message);
            }
        }
        finally { this.m_semaphore.Release(); }
    }
}
  1. Wrkr1 完成对管道的写入。 (在 TryDequeue 中)
  2. Wrkr1 确定队列为空。 (在 TryDequeue 中)
  3. Wrkr2 将项目添加到队列中。 (在发送中)
  4. wrkr2判断wrkr1占用Semaphore,返回。 (在发送中)
  5. Wrkr1 释放信号量。 (在 TryDequeue 中)
  6. 队列中有 1 项在 x 时间内不会被执行。

这一系列事件可能吗?我是否应该完全忘记这个想法,让每个“发送”呼叫等待“TryDeque”和其中的信号量?也许每个方法调用调度另一个任务的潜在性能影响可以忽略不计,即使在“高”频率下也是如此。

更新:

按照 Alex 的建议,我正在执行以下操作; 让“Send”的调用者指定一个“maxWorkload”整数,该整数指定调用者在将工作委派给另一个线程以处理任何额外工作之前准备执行多少项(对于其他调用者,在最坏的情况下)。但是,在创建新线程之前,“Send”的其他调用者有机会进入信号量,从而可能阻止使用额外的线程。

为了不让任何工作留在队列中,任何成功进入信号量并做了一些工作的工作人员必须检查退出信号量后是否有任何新工作添加。如果这是真的,同一个工人将尝试重新进入(如果未达到“maxWorkload”)或如上所述委派工作。

下面的示例:Send 现在将“TryPool”设置为“TryDequeue”的延续。 “TryPool”仅在“TryDequeue”返回 true 时开始(即在输入信号量时做了一些工作)。

    // maxWorkload cannot be -1 for this method
    private async Task<bool> TryDequeue(int maxWorkload)
    {
        int currWorkload = 0;
        while (this.m_messages.Count != 0 && this.m_semaphore.Wait(0))
        {
            try
            {
                currWorkload = await Dequeue(currWorkload, maxWorkload);
                if (currWorkload >= maxWorkload)
                    return true;
            }
            finally
            {
                this.m_semaphore.Release();
            }
        }
        return false;
    }

    private Task TryPool()
    {
        if (this.m_messages.Count == 0 || !this.m_semaphore.Wait(0))
            return Task<bool>.FromResult(false);

        return Task.Run(async () =>
        {
            do
            {
                try
                {
                    await Dequeue(0, -1);
                }
                finally
                {
                    this.m_semaphore.Release();
                }
            }
            while (this.m_messages.Count != 0 && this.m_semaphore.Wait(0));
        });
    }

    private async Task<int> Dequeue(int currWorkload, int maxWorkload)
    {
        while (currWorkload < maxWorkload || maxWorkload == -1)
        {
            Message message;
            if (!this.m_messages.TryTake(out message))
                return currWorkload;

            await this.m_pipe.WriteAsync(message);

            currWorkload++;
        }
        return maxWorkload;
    }

【问题讨论】:

  • 如果您只是将管道包裹在锁中,那么锁本质上将是一个队列,所有这些都是为您管理的。使用 SemaphoreSlim.WaitAsync 获得异步、非阻塞行为。这不会消除对手动维护队列的需要吗?
  • @usr 我曾希望如此,但肯定不是:“如果多个线程被阻塞,则没有保证的顺序,例如 FIFO 或 LIFO,可以控制线程何时进入信号量。” - msdn.microsoft.com/en-us/library/…

标签: c# asynchronous async-await semaphore


【解决方案1】:

我倾向于将这种模式称为“GatedBatchWriter”,即通过门的第一个线程处理一批任务;它自己和其他一些作家代表其他作家,直到它完成了足够的工作。

这种模式主要是有用的,当批处理工作更有效时,因为与该工作相关的开销。例如。一次将较大的块写入磁盘,而不是多个小块。

是的,这个特定的模式有一个特定的竞争条件需要注意:“负责任的写入者”,即通过门的那个,确定队列中没有更多消息并在释放信号量之前停止(即它的写责任)。第二个作家到达,在这两个决策点之间未能获得写入责任。现在队列中有一条消息不会被传递(或延迟传递,当下一个写入者到达时)。

此外,就日程安排而言,您现在正在做的事情是不公平的。如果有很多消息,队列可能永远不会是空的,通过门的写入者将永远忙于代表其他人写入消息。您需要限制负责编写者的批量大小。

您可能想要更改的其他一些内容是:

  1. 让您的Message 包含任务完成令牌。
  2. 让无法获得写入职责的写入者将他们的消息排入队列并等待两个任务完成中的任何一个:与他们的消息关联的任务完成,写入职责的释放。
  3. 让负责的编写者为其处理的消息设置完成。
  4. 让负责的作者在完成足够的工作后释放其编写责任。
  5. 当等待的写入器被两个任务完成之一唤醒时:
    • 如果它是由于其消息中的完成标记,它可以顺利进行。
    • 否则,尝试获得写入责任,冲洗,重复...

另外注意:如果消息很多,即平均消息负载很高,处理队列的专用线程/长时间运行的任务通常会具有更好的性能。

【讨论】:

  • 好点,我会考虑所有的。我有一个关于不公平的后续问题。如果“Send”的调用者没有等待响应,而是继续进行一些异步工作,“TryDequeue”中的“等待”是否可能允许池中的新线程接管。 (这是我对 async/await 的理解。)
  • @LaFleur,await 之后的延续是否在不同的线程池线程上并不重要。发送消息并首先通过门的应用程序任务将等待,只要队列中有消息,然后在发送自己的消息后恢复它应该执行的操作。
猜你喜欢
  • 2013-03-11
  • 2018-02-13
  • 2010-09-30
  • 2012-12-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-04-19
  • 1970-01-01
相关资源
最近更新 更多