【发布时间】:2012-02-28 01:14:03
【问题描述】:
我有一个托管在 Windows 服务中的 WCF 服务。该服务公开了 2 个方法:
-
bool ProcessClaim(string options, ref string xml);将一些数据作为输入,进行一些处理(包括 IO 绑定操作,如 DB 查询),然后返回结果。 -
void RunJob(string ticket);立即返回。根据ticket,从存储(例如数据库或文件系统)读取输入数据,对每个数据元素进行相同的处理,并将结果保存回存储。批次通常包含许多声明。
用户可以调用ProcessClaim处理单个请求,调用RunJob运行批处理。多个批次可以同时运行。每个处理请求都包装为Task,因此所有请求都是并行执行的。
问题是不允许批处理通过调度大量请求来阻塞处理队列。换句话说,如果用户执行大批量,它将在相当长的时间内阻塞小批量和单个处理请求。
所以我想出了以下架构,Albahari 很好地描述了(非常简短):
public sealed class ProcessingQueue : IDisposable
{
private class WorkItem
{
public readonly TaskCompletionSource<string> TaskSource;
public readonly string Options;
public readonly string Claim;
public readonly CancellationToken? CancelToken;
public WorkItem(
TaskCompletionSource<string> taskSource,
string options,
string claim,
CancellationToken? cancelToken)
{
TaskSource = taskSource;
Options = options;
Claim = claim;
CancelToken = cancelToken;
}
}
public ProcessingQueue()
: this(Environment.ProcessorCount)
{
}
public ProcessingQueue(int workerCount)
{
_taskQ = new BlockingCollection<WorkItem>(workerCount * 2);
for (var i = 0; i < workerCount; i++)
Task.Factory.StartNew(Consume);
}
public void Dispose()
{
_taskQ.CompleteAdding();
}
private readonly BlockingCollection<WorkItem> _taskQ;
public Task<string> EnqueueTask(string options, string claim, CancellationToken? cancelToken = null)
{
var tcs = new TaskCompletionSource<string>();
_taskQ.Add(new WorkItem(tcs, options, claim, cancelToken));
return tcs.Task;
}
public static Task<string> ProcessRequest(string options, string claim, CancellationToken? cancelToken = null)
{
return Task<string>.Factory.StartNew(() => ProcessItem(options, claim));
}
private void Consume()
{
foreach (var workItem in _taskQ.GetConsumingEnumerable())
{
if (workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested)
workItem.TaskSource.SetCanceled();
else
{
try
{
workItem.TaskSource.SetResult(ProcessItem(workItem.Options, workItem.Claim));
}
catch (Exception ex)
{
workItem.TaskSource.SetException(ex);
}
}
}
}
private static string ProcessItem(string options, string claim)
{
// do some actual work here
Thread.Sleep(2000); // simulate work;
return options + claim; // return final result
}
}
静态方法ProcessRequest 可用于处理单个请求,而实例方法EnqueueTask - 用于批处理。当然,所有批次都必须使用ProcessingQueue 的单个共享实例。虽然这种方法效果很好,并且可以控制多个批次同时运行的速度,但我觉得有些问题:
- 必须手动维护工作线程池
- 难以猜测最佳工作线程数(我默认使用处理器内核数)
- 当没有批处理正在运行时,线程束保持阻塞,浪费系统资源
- 处理块工作线程的 IO 绑定部分会降低 CPU 使用效率
我想知道,有没有更好的方法来处理这种情况?
更新: 其中一个要求是为批处理提供全部功能,这意味着当用户执行一个批处理并且没有其他传入请求时,所有资源必须专用于处理该批处理。
【问题讨论】:
-
听起来您想以更一致的方式分配负载。我会看一个服务总线来做到这一点。 nservicebus.com
标签: c# wcf concurrency task-parallel-library