【发布时间】:2019-09-24 14:30:37
【问题描述】:
我有这部分代码
bool hasData = true;
using (Context context = new Context())
{
using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(MAX_THREADS))
{
while (hasData)
{
Message message = context.Database.SqlQuery<Message>($@"
select top(1) * from Message where
( Status = {(int)MessageStatusEnum.Pending} ) or
( Status = {(int)MessageStatusEnum.Paused } and ResumeOn < GETUTCDATE() )
").FirstOrDefault();
if message == null)
{
hasData = false;
}
else
{
concurrencySemaphore.Wait();
tasks.Add(Task.Factory.StartNew(() =>
{
Process(message);
concurrencySemaphore.Release();
}, this.CancellationToken));
}
}
Task.WaitAll(tasks.ToArray());
}
}
而我的 Process 函数是这样的
private void Process(Message message)
{
System.Threading.Thread.Sleep(10000);
}
现在,如果我只有 1 个项目要处理,则总执行时间为 10 秒,每个项目(1 个项目)的执行时间为 10 秒。 好吧,例如,如果我有 10 个项目,那么每个项目的执行时间将增加到 15-20 秒。
我尝试更改 MAX_THREADS 的值,但总是如果我的队列中有超过 10 个项目并开始并行执行,那么每个项目的执行时间约为 15 秒。
我错过了什么?
【问题讨论】:
-
您的“非并行”代码在哪里?我怀疑
Process是Thread.Sleep,因为您正在其中做一些数据库访问(因为没有从select top(1) * from Message收到相同的消息)... 真正的问题是什么,真正的代码在哪里? -
这是一个例子,你可以自己运行。我不是在寻找有关 Process 功能或选择查询的更好代码。我只是想知道,为什么如果我有很多项目正在运行,那么每个项目的执行速度会比我只有 1 个项目时慢。
-
这是一个示例,您可以自己运行。
context.Database.SqlQuery<Message> -
好的。我编辑了 sql 查询。现在更容易理解更深层次的问题了吗?
-
使用
Parallel.ForEach,不要尝试推出自己的等价物。而且,如果您尝试跨 cpus 或线程实现队列,则需要原子的“从队列中获取和删除”操作,而不是选择。如果它不跨 CPU/线程,为什么一次只得到一个?
标签: c# multithreading parallel-processing execution-time thread-synchronization