【问题标题】:Monitor the queue of TCP messages awaiting transmission监控等待传输的 TCP 消息队列
【发布时间】:2013-03-13 20:26:27
【问题描述】:

我有一个使用 NetTcp 的 WCF 客户端和服务器。服务器托管在 Windows 服务内的 ServiceHost 中。客户端订阅 WCF 服务并注册它的回调接口和它的 InstanceContext。回调接口有几个单向方法调用。我把它全开。

这一切都很好。但是,在我的测试中,我的 Windows 服务中有代码,它通过一个紧密的循环通过单向方法调用之一尽可能快地将消息发送回客户端。我已经超出了 TCP 连接传递数据的能力,结果是消息排队。这是我所期望的。

问题是这样的:服务器上是否有任何方法可以了解队列的备份情况,以便我可以根据实时吞吐量限制发送消息的速度?

【问题讨论】:

    标签: performance wcf throughput wcf-callbacks


    【解决方案1】:

    我们从未为此找到答案,但我们创建了自己的解决方法,似乎可以解决问题。为了完整起见,我将在此处发布。我希望它可以帮助面临类似情况的其他人。

    要求:

    1. 我们有一个长时间运行的任务将在硬件服务器上运行。当我说长期运行时,我的意思是从一天到很多天。
    2. 我们希望有一个可以在网络中的任何其他桌面上启动的用户界面,以便以图形方式查看长期运行任务的统计信息。
    3. 用户界面可以多次启动和停止,并且可以同时出现多个实例。
    4. 用户界面不应对长时间运行的任务产生过多的负担。运行多个 UI 不应减慢它的速度。

    设计:

    1. 长时间运行的任务包含在 DLL 中。有一个带有 run() 方法的主类可以启动长时间运行的任务。
    2. 我们创建了一个将在硬件服务器上自动运行的 Windows 服务。
    3. Windows 服务将创建主类的实例并通过调用 run() 方法启动任务。
    4. Windows 服务还将创建一个 ServiceHost 实例并启动一个 WCF 服务实例。
    5. Windows 服务会将主类的引用传递给 WCF 服务。
    6. WCF 服务将为主类可以引发的六个事件创建处理程序。
    7. 从主类到 WCF 服务的所有通信都是通过引发这六个事件的一种方式。
    8. UI 将成为 WCF 服务的客户端,并且连接将使用 NetTcp 绑定。
    9. WCF 服务有一个 subscribe() 方法和一个 unsubscribe() 方法,以便潜在的 UI 可以加入和离开。
    10. 当 UI 调用 subscribe() 方法时,它会将唯一标识符作为字符串传递。 WCF 服务将标识符及其 OperationContext 放入 ConcurrentDictionary。
    11. 当 UI 调用 unsubscribe() 方法时,该条目将从 ConcurrentDictionary 中删除。
    12. UI 和 WCF 服务之间的契约具有从 WCF 服务到客户端的单向消息,用于处理长时间运行的任务可能引发的每种类型的事件。
    13. 在长时间运行的任务期间引发事件时,WCF 服务会处理该事件并遍历已注册的 UI 并向 UI 发送单向消息。

    此时所有这些都在起作用。

    问题:

    当我们对该系统进行压力测试时,我们创建了一个场景,其中长时间运行的任务以尽可能快的速度用事件轰炸 WCF 服务。这将是最坏的情况,但我们必须能够处理它。 WCF 服务能够处理事件并将消息放在 Tcp 通道上。由于消息是单向的,WCF 服务不会阻塞等待发送完成,这使其能够跟上正在引发的事件。

    当用户界面没有像服务器将消息推送到通道中那样快地从通道中拉出消息时,就会出现问题。消息会备份并最终开始超时并导致通道进入故障状态。我们希望在故障状态发生之前检测到这种情况,以便我们可以开始丢弃消息。不幸的是,我们找不到任何机制来检测此频道上的积压。如果我们将消息更改为双向,WCF 服务将阻塞,直到消息完成并且通道不会被备份,但是,这会影响长时间运行的服务并使其变慢。不好。

    解决方案:

    我们通过在包含长时间运行任务的同一个 DLL 中创建一个特殊类来解决这个问题。此类负责与任何附加的用户界面进行通信。此通信对象包含要引发的每个事件的 ConcurrentQueue。当长时间运行的任务通常将事件引发回 WCF 服务时,它现在将改为调用此通信对象中的方法。

    在此方法中,通信对象会将事件 args 输入到该事件的 ConcurrentQueue 中。通信对象还有一个方法,该方法在创建对象时在单独的线程上启动。这种新方法将不断循环通过 concurrentQueues 并弹出事件参数并实际引发事件。我们将 NetTcp 调用改为双向调用,因此线程中的例程将绑定到 TCP 通道的速度,但由于它在单独的线程中,它不会减慢长时间运行任务的主处理。

    现在我们有了一个可以使用的 ConcurrentQueue,我们可以检查积压工作。我们在逻辑上为并发队列设置了一些限制(在当前情况下为 10)。当长时间运行的任务调用该方法将事件 args 添加到队列中时,它首先检查队列的计数,如果它小于我们的逻辑限制,它将事件 args 入队,否则它只是将其丢弃并继续。这样长运行队列的速度不会受到影响,WCF 服务也不会备份并导致通道状态出错。

    总结:

    我们欢迎任何反馈或替代想法。这对我们来说似乎工作得很好,而且似乎是有弹性的。

    class UI
    {
        #region Class Scoped Variables
        private Int32 _threashold = 10;
        private bool _continue = true;
        #endregion Class Scoped Variables
    
        #region Public Delegate Definitions
        public delegate void OnPlanSelectionChangedDelegate(PlanSelectionChangedEventArgs e);
        // other lines deleted for brevity
        #endregion Public Delegate Definitions
    
        #region Local Delegate Instances
        private OnPlanSelectionChangedDelegate _onPlanSelectionChangedDelegate = null;
        // other lines deleted for brevity
        #endregion Local Delegate Instances
    
        #region Local Queues for Delegates
        private ConcurrentQueue<PlanSelectionChangedEventArgs> _planSelectionChangedQueue
            = new ConcurrentQueue<PlanSelectionChangedEventArgs>();
        // other lines deleted for brevity
        #endregion Local Queues for Delegates
    
        #region Constructor
        public UI(OnPlanSelectionChangedDelegate onPlanSelectionChanged)
        {
            _onPlanSelectionChangedDelegate = onPlanSelectionChanged;
            // other lines deleted for brevity
            ThreadPool.QueueUserWorkItem(new WaitCallback(DoWork), null);
        }
        #endregion Constructor
    
        #region Public Methods
        public void Shutdown()
        {
            _continue = false;
        }
        public void SendPlanSelection(PlanSelectionChangedEventArgs e)
        {
            if (_planSelectionChangedQueue.Count < _threashold)
            {
                if (_cntPlanSelectionDropped > 0)
                {
                    e.Dropped = _cntPlanSelectionDropped;
                }
                _planSelectionChangedQueue.Enqueue(e);
                _cntPlanSelectionDropped = 0;
            }
            else
            {
                _cntPlanSelectionDropped++;
            }
        }
        // other lines deleted for brevity
        #endregion Public Methods
    
        #region Private Asychronous Method
        private void DoWork(object dummy)
        {
            PlanSelectionChangedEventArgs planSelectionChangedEventArgs = null;
            while (_continue)   // process this loop until told to quit
            {
                // Plan Selection Changed
                // Try to get the next event args in a thread safe way
                if (_planSelectionChangedQueue.TryDequeue(out planSelectionChangedEventArgs))
                {
                    // We got an event args from the queue, do we have a valid delegate?
                    if (_onPlanSelectionChangedDelegate != null)
                    {
                        // We have a delegate, call it with the event args and rais the event
                        _onPlanSelectionChangedDelegate(planSelectionChangedEventArgs);
                    }
                }
    
                // other lines deleted for brevity
            }
        }
        #endregion Private Asychronous Method
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2013-04-12
      • 1970-01-01
      • 1970-01-01
      • 2018-07-18
      • 2012-03-23
      • 2011-08-27
      • 2013-02-27
      • 1970-01-01
      相关资源
      最近更新 更多