【发布时间】:2015-05-15 19:52:56
【问题描述】:
我不确定的情况涉及“线程安全”管道流的使用,其中多个线程可以添加要写入的消息。如果没有要写入的消息队列,则当前线程将开始写入读取方。如果有队列,并且在管道写入时队列增长,我希望开始写入的线程耗尽队列。
我“希望”这种设计(如下所示)阻止 SemaphoreSlim 的连续进入/释放并减少计划任务的数量。我说“希望”是因为我应该测试这种并发症是否对性能有任何积极影响。但是,在进行测试之前,我应该首先了解代码是否按照我的想法执行,所以请考虑以下类,以及它下面的一系列事件;
注意:我了解任务的执行与任何特定线程无关,但我发现这是最简单的解释方式。
class SemaphoreExample
{
// Wrapper around a NamedPipeClientStream
private readonly MessagePipeClient m_pipe =
new MessagePipeClient("somePipe");
private readonly SemaphoreSlim m_semaphore =
new SemaphoreSlim(1, 1);
private readonly BlockingCollection<Message> m_messages =
new BlockingCollection<Message>(new ConcurrentQueue<Message>());
public Task Send<T>(T content)
where T : class
{
if (!this.m_messages.TryAdd(new Message<T>(content)))
throw new InvalidOperationException("No more requests!");
Task dequeue = TryDequeue();
return Task.FromResult(true);
// In reality this class (and method) is more complex.
// There is a similiar pipe (and wrkr) in the other direction.
// The "sent jobs" is kept in a dictionary and this method
// returns a task belonging to a completionsource tied
// to the "sent job". The wrkr responsible for the other
// pipe reads a response and sets the corresponding
// completionsource.
}
private async Task TryDequeue()
{
if (!this.m_semaphore.Wait(0))
return; // someone else is already here
try
{
Message message;
while (this.m_messages.TryTake(out message))
{
await this.m_pipe.WriteAsync(message);
}
}
finally { this.m_semaphore.Release(); }
}
}
- Wrkr1 完成对管道的写入。 (在 TryDequeue 中)
- Wrkr1 确定队列为空。 (在 TryDequeue 中)
- Wrkr2 将项目添加到队列中。 (在发送中)
- wrkr2判断wrkr1占用Semaphore,返回。 (在发送中)
- Wrkr1 释放信号量。 (在 TryDequeue 中)
- 队列中有 1 项在 x 时间内不会被执行。
这一系列事件可能吗?我是否应该完全忘记这个想法,让每个“发送”呼叫等待“TryDeque”和其中的信号量?也许每个方法调用调度另一个任务的潜在性能影响可以忽略不计,即使在“高”频率下也是如此。
更新:
按照 Alex 的建议,我正在执行以下操作; 让“Send”的调用者指定一个“maxWorkload”整数,该整数指定调用者在将工作委派给另一个线程以处理任何额外工作之前准备执行多少项(对于其他调用者,在最坏的情况下)。但是,在创建新线程之前,“Send”的其他调用者有机会进入信号量,从而可能阻止使用额外的线程。
为了不让任何工作留在队列中,任何成功进入信号量并做了一些工作的工作人员必须检查退出信号量后是否有任何新工作添加。如果这是真的,同一个工人将尝试重新进入(如果未达到“maxWorkload”)或如上所述委派工作。
下面的示例:Send 现在将“TryPool”设置为“TryDequeue”的延续。 “TryPool”仅在“TryDequeue”返回 true 时开始(即在输入信号量时做了一些工作)。
// maxWorkload cannot be -1 for this method
private async Task<bool> TryDequeue(int maxWorkload)
{
int currWorkload = 0;
while (this.m_messages.Count != 0 && this.m_semaphore.Wait(0))
{
try
{
currWorkload = await Dequeue(currWorkload, maxWorkload);
if (currWorkload >= maxWorkload)
return true;
}
finally
{
this.m_semaphore.Release();
}
}
return false;
}
private Task TryPool()
{
if (this.m_messages.Count == 0 || !this.m_semaphore.Wait(0))
return Task<bool>.FromResult(false);
return Task.Run(async () =>
{
do
{
try
{
await Dequeue(0, -1);
}
finally
{
this.m_semaphore.Release();
}
}
while (this.m_messages.Count != 0 && this.m_semaphore.Wait(0));
});
}
private async Task<int> Dequeue(int currWorkload, int maxWorkload)
{
while (currWorkload < maxWorkload || maxWorkload == -1)
{
Message message;
if (!this.m_messages.TryTake(out message))
return currWorkload;
await this.m_pipe.WriteAsync(message);
currWorkload++;
}
return maxWorkload;
}
【问题讨论】:
-
如果您只是将管道包裹在锁中,那么锁本质上将是一个队列,所有这些都是为您管理的。使用 SemaphoreSlim.WaitAsync 获得异步、非阻塞行为。这不会消除对手动维护队列的需要吗?
-
@usr 我曾希望如此,但肯定不是:“如果多个线程被阻塞,则没有保证的顺序,例如 FIFO 或 LIFO,可以控制线程何时进入信号量。” - msdn.microsoft.com/en-us/library/…
标签: c# asynchronous async-await semaphore