【问题标题】:Architecture around processing single and batch requests simultaneously围绕同时处理单个和批处理请求的架构
【发布时间】:2012-02-28 01:14:03
【问题描述】:

我有一个托管在 Windows 服务中的 WCF 服务。该服务公开了 2 个方法:

  1. bool ProcessClaim(string options, ref string xml); 将一些数据作为输入,进行一些处理(包括 IO 绑定操作,如 DB 查询),然后返回结果。
  2. void RunJob(string ticket); 立即返回。根据ticket,从存储(例如数据库或文件系统)读取输入数据,对每个数据元素进行相同的处理,并将结果保存回存储。批次通常包含许多声明。

用户可以调用ProcessClaim处理单个请求,调用RunJob运行批处理。多个批次可以同时运行。每个处理请求都包装为Task,因此所有请求都是并行执行的。 问题是不允许批处理通过调度大量请求来阻塞处理队列。换句话说,如果用户执行大批量,它将在相当长的时间内阻塞小批量和单个处理请求。 所以我想出了以下架构,Albahari 很好地描述了(非常简短):

public sealed class ProcessingQueue : IDisposable
{
    private class WorkItem
    {
        public readonly TaskCompletionSource<string> TaskSource;
        public readonly string Options;
        public readonly string Claim;
        public readonly CancellationToken? CancelToken;

        public WorkItem(
            TaskCompletionSource<string> taskSource,
            string options,
            string claim,
            CancellationToken? cancelToken)
        {
            TaskSource = taskSource;
            Options = options;
            Claim = claim;
            CancelToken = cancelToken;
        }
    }

    public ProcessingQueue()
        : this(Environment.ProcessorCount)
    {
    }

    public ProcessingQueue(int workerCount)
    {
        _taskQ = new BlockingCollection<WorkItem>(workerCount * 2);

        for (var i = 0; i < workerCount; i++)
            Task.Factory.StartNew(Consume);
    }

    public void Dispose()
    {
        _taskQ.CompleteAdding();
    }

    private readonly BlockingCollection<WorkItem> _taskQ;

    public Task<string> EnqueueTask(string options, string claim, CancellationToken? cancelToken = null)
    {
        var tcs = new TaskCompletionSource<string>();
        _taskQ.Add(new WorkItem(tcs, options, claim, cancelToken));
        return tcs.Task;
    }

    public static Task<string> ProcessRequest(string options, string claim, CancellationToken? cancelToken = null)
    {
        return Task<string>.Factory.StartNew(() => ProcessItem(options, claim));
    }

    private void Consume()
    {
        foreach (var workItem in _taskQ.GetConsumingEnumerable())
        {
            if (workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested)
                workItem.TaskSource.SetCanceled();
            else
            {
                try
                {
                    workItem.TaskSource.SetResult(ProcessItem(workItem.Options, workItem.Claim));
                }
                catch (Exception ex)
                {
                    workItem.TaskSource.SetException(ex);
                }
            }
        }
    }

    private static string ProcessItem(string options, string claim)
    {
        // do some actual work here
        Thread.Sleep(2000); // simulate work;
        return options + claim; // return final result
    }
}

静态方法ProcessRequest 可用于处理单个请求,而实例方法EnqueueTask - 用于批处理。当然,所有批次都必须使用ProcessingQueue 的单个共享实例。虽然这种方法效果很好,并且可以控制多个批次同时运行的速度,但我觉得有些问题:

  • 必须手动维护工作线程池
  • 难以猜测最佳工作线程数(我默认使用处理器内核数)
  • 当没有批处理正在运行时,线程束保持阻塞,浪费系统资源
  • 处理块工作线程的 IO 绑定部分会降低 CPU 使用效率

我想知道,有没有更好的方法来处理这种情况?

更新: 其中一个要求是为批处理提供全部功能,这意味着当用户执行一个批处理并且没有其他传入请求时,所有资源必须专用于处理该批处理。

【问题讨论】:

  • 听起来您想以更一致的方式分配负载。我会看一个服务总线来做到这一点。 nservicebus.com

标签: c# wcf concurrency task-parallel-library


【解决方案1】:

我会说,使用单个服务接口和单个托管容器来处理这两种截然不同的需求可能是错误的。

您应该将您的服务解耦为两个 - 一个按需返回对单个请求的响应,另一个将批处理查询排队并在单个线程上处理它们。

通过这种方式,您可以为实时消费者提供高可用性渠道,并为批量消费者提供离线渠道。这些可以作为单独的关注点进行部署和管理,允许您在每个服务接口上提供不同的服务级别。

只是我对提议的架构的想法。

更新

事实上,您的音量处理渠道是一个离线渠道。这意味着消费者将不得不排队等待,并且不确定的时间量才能让他们的请求返回。

那么作业队列呢?每个作业在处理过程中都会获得所有可用资源。处理完作业后,调用者会收到作业已完成的通知。

【讨论】:

  • 这是一个很好的观点,我绝对应该将这些功能分开。但是,要求之一是为批处理提供全部功能,这意味着当用户执行一个批处理并且没有其他传入请求时,所有资源必须专用于处理该批处理。您在单个线程中处理批处理的建议不符合此要求。另一方面,增加一个批次的线程数会导致相同的初始问题。
  • 我同意你的看法。我不知道如何最好地解决您的问题。您的挑战是以某种方式将更多线程用于处理批处理,同时最大限度地提高线程可用性。至少通过解耦您的渠道,您可以避免影响实时端点。
  • 感谢您的更新。事实上,我以类似的方式实现了它——单个请求不会被排队,而是像简单的 WCF 调用一样立即执行,而批处理请求则组织自己的队列来发出并发请求。我依靠这个美妙的 TPL Dataflow 库来完成我需要的所有魔法......
猜你喜欢
  • 1970-01-01
  • 2016-08-04
  • 2018-08-10
  • 2023-02-09
  • 2014-07-06
  • 1970-01-01
  • 1970-01-01
  • 2017-03-01
  • 2012-10-27
相关资源
最近更新 更多