【问题标题】:Returning data from a queue of method calls从方法调用队列中返回数据
【发布时间】:2015-09-14 19:06:29
【问题描述】:

我正在尝试创建一个队列来限制和重试对服务的 API 调用。

劣质图:

队列需要接受多种类型的调用(我想我已经搞定了),将一个方法调用队列及其参数存储到 API 库中。如果调用失败,则需要在队列顶部重试(想通了)。然后它需要将数据返回到程序中进行原始调用的方法(或回调到另一个方法),即使该调用由于队列而延迟了很长一段时间。一直没有阻塞用户界面。

除了我将数据返回给原始调用者的部分之外,我已经弄明白了大部分内容。或者至少是该类中将接受数据的方法(如回调)。

我该怎么做呢?当可以从任意数量的类进行调用时,如何将队列中的数据返回给程序的其余部分?如果我要利用回调,我在哪里/如何将回调信息存储在队列中?

队列会做什么的另一个伪劣图表:

【问题讨论】:

    标签: c# .net asynchronous queue


    【解决方案1】:

    下面的代码会给你一个想法。

    // this is common practice for genertic classes like BaseApiRequest<T> -
    // create parent class which does not have generic parameters
    public abstract class BaseApiRequest : IDisposable {
        public abstract void Dispose();
        public abstract void SetException(Exception ex);
    }
    
    public abstract class BaseApiRequest<T> : BaseApiRequest {
        private readonly ManualResetEventSlim _signal;
        private Exception _exception;
        private T _result;
    
        protected BaseApiRequest() {
            _signal = new ManualResetEventSlim(false);
        }
    
        public T GetResult() {
            _signal.Wait();
            if (_exception != null)
                throw new Exception("Exception during request processing. See inner exception for details", _exception);
            return _result;
        }
    
        public T GetResult(CancellationToken token) {
            _signal.Wait(token);
            if (_exception != null)
                throw new Exception("Exception during request processing. See inner exception for details", _exception);
            return _result;
        }
    
        public bool TryGetResult(TimeSpan timeout, out T result) {
            result = default(T);
            if (_signal.Wait(timeout)) {
                if (_exception != null)
                    throw new Exception("Exception during request processing. See inner exception for details", _exception);
                result = _result;
                return true;
            }
            return false;
        }        
    
        public void SetResult(T result) {
            _result = result;
            _signal.Set();
            var handler = ResultReady;
            if (handler != null)
                handler(this, new ResultReadyEventArgs<T>(_result));
        }
    
        public override void SetException(Exception ex) {
            _exception = ex;
            _signal.Set();
            var handler = ResultReady;
            if (handler != null)
                handler(this, new ResultReadyEventArgs<T>(_exception));
        }
    
        public override void Dispose() {
            _signal.Dispose();
        }
    
        public event EventHandler<ResultReadyEventArgs<T>> ResultReady;
    
        public class ResultReadyEventArgs<T> : EventArgs {
            public ResultReadyEventArgs(T result) {
                this.Result = result;
                this.Success = true;
            }
    
            public ResultReadyEventArgs(Exception ex)
            {
                this.Exception = ex;
                this.Success = false;
            }
    
            public bool Success { get; private set; }
            public T Result { get; private set; }
            public Exception Exception { get; private set; }
        }
    }    
    

    这可能是您的 api 请求的基类。处理请求时,您的处理器会调用 SetResult。调用者创建请求,将其发布到您的队列,然后有选项:

    1. 调用者需要同步结果。然后他调用 GetResult,这是一个阻塞调用。如果他不想永远等待以防万一出现问题,调用者可以使用 TryGetResult 超时。

    2. 如果不需要立即获取结果,调用者可以订阅 ResultReady 事件,或者稍后再调用 GetResult。

    现在要处理您的请求,您有多种选择。一种是向 BaseApiRequest 添加句柄逻辑权。然后,您从处理器中的队列中提取 BaseApiRequest 并在其上调用 Process 。您可能会说请求不应该包含处理自身的逻辑,因为它只是请求。然后考虑更复杂一点的类结构:

    public interface IApiRequestHandler {
        // type of request which is handled by this handler
        Type RequestType { get; }
        void Validate(BaseApiRequest request);
        void Process(BaseApiRequest request);
    }
    
    // specific request
    public class SomeDataRequest : BaseApiRequest<int> {
        public string Argument1 { get; set; }
        public long Argument2 { get; set; }
    }
    
    // specific request handler
    public class SomeDataRequestHandler : IApiRequestHandler {
        public Type RequestType { get { return typeof(SomeDataRequest); } }
    
        public void Validate(BaseApiRequest baseRequest) {
            // safe to cast here
            var request = (SomeDataRequest) baseRequest;
            // validate and throw exception if something is wrong
            // no reason to validate when we already started processing
        }
    
        public void Process(BaseApiRequest baseRequest) {
            // safe to cast here
            var request = (SomeDataRequest) baseRequest;
            // do processing
            request.SetResult(1);
        }
    }
    

    然后,您的 api 将如下所示:

        // this should be singleton
    public class Api : IDisposable {
        private readonly BlockingCollection<BaseApiRequest> _requests = new BlockingCollection<BaseApiRequest>(new ConcurrentQueue<BaseApiRequest>());
        private readonly CancellationTokenSource _cts = new CancellationTokenSource();
        private readonly Dictionary<Type, IApiRequestHandler> _handlers = new Dictionary<Type, IApiRequestHandler>();
    
        public Api() {
            // find or explicitly register handlers in some way
            // here we just search them in current assembly
            foreach (var type in Assembly.GetExecutingAssembly().GetTypes().Where(c => typeof (IApiRequestHandler).IsAssignableFrom(c))) {
                var handler = (IApiRequestHandler) Activator.CreateInstance(type);
                if (_handlers.ContainsKey(handler.RequestType))
                    throw new Exception(String.Format("Request handler for request type {0} already registered.", handler.RequestType));
                _handlers.Add(handler.RequestType, handler);
            }
    
            new Thread(ProcessingLoop) {IsBackground = true}.Start();
        }
    
        private void ProcessingLoop() {
            try {
                foreach (var request in _requests.GetConsumingEnumerable(_cts.Token)) {
                    try {
                        // no casting from object or switches here
                        _handlers[request.GetType()].Process(request);
                    }
                    catch (Exception ex) {
                       request. SetException(ex);
                    }
                }
            }
            catch (OperationCanceledException) {
                return;
            }
        }
    
        public void StartProcessing(BaseApiRequest request) {
            if (_handlers.ContainsKey(request.GetType()))
                throw new Exception("No handlers registered for request type " + request.GetType());
            // validate synchronously
            _handlers[request.GetType()].Validate(request);
            _requests.Add(request);
        }
    
        public void Dispose() {
            _cts.Cancel();
        }
    }
    

    【讨论】:

    • 抱歉回复晚了。好久不见了,谢谢你的回复。所以如果我理解正确的话。我将创建一个BaseAPIRequest,将其传递给我的队列,它会按顺序处理它,并在完成后调用SetResult。此时,如果我订阅了ResultReady 事件,我会在结果触发时收到回调。否则我需要同步使用GetResult 或异步轮询TryGetResult。然后我会通过调用Dispose? 来处理该对象
    • 对。 TryGetResult 也是为了避免死锁 - 如果在该超时后没有得到结果,则在那里传递相对较长的超时并抛出异常。它通常更安全。您还可以添加 GetResult 接受 CancellationToken - 将该令牌传递给 Wait 方法(在这种情况下使用 ManualResetEventSlim 类)。
    • 所以在稍微修改之后,我遇到了一种令人讨厌的代码气味。由于每个请求都采用不同的参数(作为 API 库中的一种请求类型)并返回不同的数据(作为结果类型)。我需要将它们作为object 而不是它们各自的类型存储到队列中。然后,我需要在出队时将它们转换回各自的类型,以便进行 API 调用。这在某种程度上感觉是错误的,因为向上转换它们只是一个非常长的 switch 语句,涵盖了所有可能的请求。在您看来,这是一件“好的”事情吗?如果没有,有什么建议吗?
    • @douglasg14b 考虑我的扩展和编辑答案,以获取有关如何处理该问题的更多选项。
    • 感谢所有帮助!好吧,我想我已经掌握了这将如何工作。在这种情况下,我将最终为每个请求提供两个类,一个用于请求本身,一个用于结果。出于好奇,这比 switch 语句有什么优势? (有十几个类与一个有十几个可能性的 switch 语句)。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-07-20
    • 2017-08-14
    • 1970-01-01
    相关资源
    最近更新 更多