主题列表:
引言:
在上一篇随笔中,对操作COMET的实体类进行了简单的分析,本篇随笔将对COMET框架的线程控制进行分析,欢迎大家来拍砖。
CORE框架:
图1.1 COMET核心框架
CometWaitRequest类:
CometWaitRequest是一个请求信息的控制信息的抽象,包含了所有需要的控制信息,并且由CometWaitThread直接调用。简单地理解这个类,服务器事件的发布需要有一些线程进行管理,CometWaitRequest对这些事件的控制信息进行抽象,一个用户可以引发多个客户端请求,服务器端需要对这样的请求进行映射,一个用户提交给服务器的信息,需要分别由相应线程进行管制,事件的队列化,然后就是请求的队列化,消息的队列化,都需要线程的直接管理,这个类是客户端管理信息的抽象。
PS:这里需要说明的是 "DateTime?” 后加的问号最好不要去掉,它代表DateDeactivated变量可赋值为NULL,超时是可以存在也可不存在的,所以我们没有必要为DateDeactivated预先实例化。CometAsyncResult类在这个类里被实例化。
CometAsyncResult类:
这个类也很有趣,MSDN上对IAsyncResult 接口有如下的解释:
“IAsyncResult 接口由包含可异步操作的方法的类实现。它是启动异步操作的方法的返回类型,如 FileStream.BeginRead,也是结束异步操作的方法的第三个参数的类型,如 FileStream.EndRead。当异步操作完成时,IAsyncResult 对象也将传递给由 AsyncCallback 委托调用的方法。
支持 IAsyncResult 接口的对象存储异步操作的状态信息,并提供同步对象以允许线程在操作完成时终止。”
CometAsyncResult类继承了IAsyncResult 接口有很多的好处,首先我们可以直接借用.net框架AsyncCallback的BeginInvoke和EndInvoke启动或终结异步操作。
“.NET Framework 允许您异步调用任何方法。为此,应定义与您要调用的方法具有相同签名的委托;公共语言运行库会自动使用适当的签名为该委托定义 BeginInvoke 和 EndInvoke 方法。
BeginInvoke 方法可启动异步调用。它与您需要异步执行的方法具有相同的参数,另外它还有两个可选参数。第一个参数是一个 AsyncCallback 委托,该委托引用在异步调用完成时要调用的方法。第二个参数是一个用户定义的对象,该对象可向回调方法传递信息。BeginInvoke 立即返回,不等待异步调用完成。BeginInvoke 会返回 IAsyncResult,这个结果可用于监视异步调用进度。
EndInvoke 方法检索异步调用的结果。调用 BeginInvoke 后可随时调用 EndInvoke 方法;如果异步调用尚未完成,EndInvoke 将一直阻止调用线程,直到异步调用完成后才允许调用线程执行。EndInvoke 的参数包括您需要异步执行的方法的 out 和 ref 参数(在 Visual Basic 中为 <Out> ByRef 和 ByRef)以及由 BeginInvoke 返回的 IAsyncResult。”
使用帮助:http://msdn.microsoft.com/zh-cn/library/2e08f6yc(VS.80).aspx
PS:如果只是从应用来说,这一段是可以跳过去的,因为拓展这套框架不需要我们修改这个类的结构。
CometWaitThread类:
CometWaitThread线程类负责控制CometWaitRequest,当一个用户提交了消息以后,会建立相应的CometWaitThread。
ps:关于线程控制这部分,只是做简要的介绍,毕竟COMET框架的核心说到这里,就都是实现的机制了,而线程无非是作为各种“通道”的管理工具存在,并不需要什么拓展。当然如果做为研究,这里可以深入挖掘。比如加入一些竞争算法来优化消息控制,或者对特定消息(如紧急事件等)可以在线程控制上优先发布。
CometStateManager类:
CometStateManager是整套COMET机制的核心,它将CometWaitThread和ICometStateProvider、CometClient、CometMessage结合在了一起,形成了整套COMET的应用框架。是拓展COMET应用必须要改造的类。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Web;
using System.Web.UI;
using System.Runtime.Serialization.Json;
namespace MethodWorx.AspNetComet.Core
{
/// <summary>
/// CometStateManger Class
///
/// An instance of the this class is used to control the manager the state of the COMET Application.
/// This class manages an instance of a ICometStateProvider instance
/// </summary>
///
public class CometStateManager
{
private ICometStateProvider stateProvider;
private int workerThreadCount;
private int maximumTimeSlot;
private int currentThread = 0;
private CometWaitThread[] workerThreads;//
private object state = new object();
/// <summary>
/// Event that is called when a Client is Initialized
/// </summary>
public event CometClientEventHandler ClientInitialized;
/// <summary>
/// Event that is called when a Client is killed
/// </summary>
public event CometClientEventHandler IdleClientKilled;
/// <summary>
/// Event that is called when a Client subscribes to this channel
/// </summary>
public event CometClientEventHandler ClientSubscribed;
/// <summary>
/// Construct an instane of the CometStateManager class and pass in an
/// instance of an ICometStateProvider to manage the persistence of the state
/// </summary>
/// <param name="stateProvider">An instance of an ICometStateProvider class that manages the persistence of the state</param>
/// <param name="workerThreadCount">How many worked threads should this CometStateManager initialize</param>
/// <param name="maximumTimeSlot">The maximum time in milliseconds that should be idle between each COMET client is polled within a worker thread</param>
public CometStateManager(ICometStateProvider stateProvider, int workerThreadCount, int maximumTimeSlot)
{
if (stateProvider == null)
throw new ArgumentNullException("stateProvider");
if (workerThreadCount <= 0)
throw new ArgumentOutOfRangeException("workerThreadCount");
// ok, setup the member of this class
this.stateProvider = stateProvider;
this.workerThreadCount = workerThreadCount;
this.maximumTimeSlot = maximumTimeSlot;
this.workerThreads = new CometWaitThread[5];
//
// ok, lets fireup the threads
for(int i = 0; i < workerThreadCount; i ++)
{
this.workerThreads[i] =
new CometWaitThread(this);
}
}
/// <summary>
/// Construct an instane of the CometStateManager class and pass in an
/// instance of an ICometStateProvider to manage the persistence of the state
///
/// This calls the main constructor and specifies the default values workerThreadCount = 5 and maximumTimeSlot = 100. These values
/// can be tuned for your application by using the main constructor
/// </summary>
/// <param name="stateProvider">An instance of an ICometStateProvider class that manages the persistence of the state</param>
public CometStateManager(ICometStateProvider stateProvider)
: this(stateProvider, 5, 100)
{
}
/// <summary>
/// Creates a CometClient instance that is persisted into the ICometStateProvider instance. This needs to be
/// called by the server prior to the client connecting from a client application.
/// </summary>
/// <remarks>
/// This method will typically be used after a login script is executed, either from a standard ASP.NET form
/// or an Ajax Method etc...
///
/// The server would validate the user information, if successfull initialize a client in the COMET engine ready
/// for the client to connect.
/// </remarks>
/// <param name="publicToken">The public token of the client, this token is used to identify the client to other clients</param>
/// <param name="privateToken">The private token of the client, this token is used to identify the client to itself</param>
/// <param name="displayName">The display name of the client, can be used to hold a friendly display name of the client</param>
/// <param name="connectionTimeoutSeconds">The number of seconds the client will be connected to the server for, until it needs to reestablish a connection becuase no messages have been sent</param>
/// <param name="connectionIdleSeconds">The number of seconds the server will wait for the client to reconnect before it treats it as an idle connection and removes it from the server</param>
/// <returns>An initialized CometClient object that represents the initialized client</returns>
public CometClient InitializeClient(string publicToken, string privateToken, string displayName, int connectionTimeoutSeconds, int connectionIdleSeconds)
{
// validate the parameters
if (string.IsNullOrEmpty(publicToken))
throw new ArgumentNullException("publicToken");
if (string.IsNullOrEmpty(privateToken))
throw new ArgumentNullException("privateToken");
if (string.IsNullOrEmpty(displayName))
throw new ArgumentNullException("displayName");
if (connectionIdleSeconds <= 0)
throw new ArgumentOutOfRangeException("connectionIdleSeconds must be greater than 0");
if (connectionTimeoutSeconds <= 0)
throw new ArgumentOutOfRangeException("connectionTimeoutSeconds must be greater than 0");
CometClient cometClient = new CometClient();
// ok, set it up
cometClient.ConnectionIdleSeconds = connectionIdleSeconds;
cometClient.ConnectionTimeoutSeconds = connectionTimeoutSeconds;
cometClient.DisplayName = displayName;
cometClient.LastActivity = DateTime.Now;
cometClient.PrivateToken = privateToken;
cometClient.PublicToken = publicToken;
//
// send this to the state provider
this.stateProvider.InitializeClient(cometClient);
// ok, fire the event
this.FireClientInitialized(cometClient);
return cometClient;
}
/// <summary>
/// Called from an Asynchronous HttpHandler Method to begin the Subscribe call
/// </summary>
/// <param name="context">HttpContext passed in from the handler</param>
/// <param name="callback">AsyncCallback passed in from the handler</param>
/// <param name="extraData">AsyncState passed in from the handler</param>
/// <returns>An IAsyncResult used to identify and control the asynchronous operation</returns>
public IAsyncResult BeginSubscribe(HttpContext context, AsyncCallback callback, object extraData)
{
try
{
long lastMessageId;
string privateToken;
if (!long.TryParse(context.Request["lastMessageId"] ?? "-1", out lastMessageId))
throw CometException.CometHandlerParametersAreInvalidException();
privateToken = context.Request["privateToken"];
if (string.IsNullOrEmpty(privateToken))
throw CometException.CometHandlerParametersAreInvalidException();
this.DebugWriteThreadInfo("BeginSubscribe");
lock (state)
{
//
// get the comet client
CometClient cometClient = this.GetCometClient(privateToken);
// ok, fire the event
this.FireClientSubscribed(cometClient);
// kill the previous one if one exists
// from the thread pool
for (int i = 0; i < this.workerThreadCount; i++)
{
this.workerThreads[i].DequeueCometWaitRequest(privateToken);
}
// ok, this is our result, so lets queue it
CometWaitRequest request = new CometWaitRequest(privateToken, lastMessageId, context, callback, extraData);
// we have our request so lets queue it on a thread
this.workerThreads[this.currentThread].QueueCometWaitRequest(request);
// cycle the thread count
this.currentThread++;
if (this.currentThread >= this.workerThreadCount)
this.currentThread = 0; // cycle back to 0
return request.Result;
}
}
catch (Exception ex)
{
this.WriteErrorToResponse(context, ex.Message);
return null;
}
}
/// <summary>
/// Called from an Asynchronous HttpHandler Method method to complete the Subscribe call
/// </summary>
/// <param name="result">The IAsyncResult instance that was initialized in the BeginSubscribe call</param>
public void EndSubscribe(IAsyncResult result)
{
this.DebugWriteThreadInfo("EndSubscribe");
CometAsyncResult cometAsyncResult = result as CometAsyncResult;
if (cometAsyncResult != null)
{
try
{
// get the messages
CometMessage[] messages = cometAsyncResult.CometMessages;
// serialize the messages
// back to the client
if (messages != null && messages.Length > 0)
{
List<Type> knownTypes = new List<Type>();
foreach (CometMessage message in messages)
{
if (message.Contents != null)
{
Type knownType = message.Contents.GetType();
if (!knownTypes.Contains(knownType))
{
knownTypes.Add(knownType);
}
}
}
DataContractJsonSerializer serializer = new DataContractJsonSerializer(messages.GetType(), knownTypes);
serializer.WriteObject(((CometAsyncResult)result).Context.Response.OutputStream, messages);
}
}
catch (Exception ex)
{
// write the error out??
this.WriteErrorToResponse(((CometAsyncResult)result).Context, ex.Message);
}
}
}
/// <summary>
/// Send a message to a specific client
/// </summary>
/// <param name="clientPublicToken">The public token of the client</param>
/// <param name="name">The name of the message</param>
/// <param name="contents">The contents of the message</param>
public void SendMessage(string clientPublicToken, string name, object contents)
{
this.stateProvider.SendMessage(clientPublicToken, name, contents);
}
/// <summary>
/// Send a message to all clients
/// </summary>
/// <param name="name">The name of the message</param>
/// <param name="contents">The contents of the message</param>
public void SendMessage(string name, object contents)
{
this.stateProvider.SendMessage(name, contents);
}
/// <summary>
/// Gets the ICometStateProvider instance this manager consumes
/// </summary>
internal ICometStateProvider StateProvider
{
get { return this.stateProvider; }
}
/// <summary>
/// Register the required javascript for the page
/// </summary>
/// <param name="page">The page we want to write the scripts to</param>
public static void RegisterAspNetCometScripts(Page page)
{
page.ClientScript.RegisterClientScriptResource(typeof(CometStateManager), "MethodWorx.AspNetComet.Core.Scripts.AspNetComet.js");
}
/// <summary>
/// Kill an IdleCometClient
/// </summary>
/// <param name="clientPrivateToken"></param>
public void KillIdleCometClient(string clientPrivateToken)
{
// get the comet client
CometClient cometClient = this.stateProvider.GetCometClient(clientPrivateToken);
// ok, tmie the clietn out
this.stateProvider.KillIdleCometClient(clientPrivateToken);
// and fire
this.FireIdleClientKilled(cometClient);
}
public CometClient GetCometClient(string clientPrivateToken)
{
return this.stateProvider.GetCometClient(clientPrivateToken);
}
internal void DebugWriteThreadInfo(string message)
{
int workerAvailable = 0;
int completionPortAvailable = 0;
ThreadPool.GetAvailableThreads(out workerAvailable, out completionPortAvailable);
Debug.WriteLine(string.Format("{0}: {1} {2} out of {3}/{4}", message, Thread.CurrentThread.IsThreadPoolThread, Thread.CurrentThread.ManagedThreadId, workerAvailable, completionPortAvailable));
}
internal void FireClientInitialized(CometClient cometClient)
{
if (this.ClientInitialized != null)
this.ClientInitialized(this, new CometClientEventArgs(cometClient));
}
internal void FireIdleClientKilled(CometClient cometClient)
{
if (this.IdleClientKilled != null)
this.IdleClientKilled(this, new CometClientEventArgs(cometClient));
}
internal void FireClientSubscribed(CometClient cometClient)
{
if (this.ClientSubscribed != null)
this.ClientSubscribed(this, new CometClientEventArgs(cometClient));
}
private void WriteErrorToResponse(HttpContext context, string message)
{
//
// ok, we have had an error so we have to return it
CometMessage errorMessage = new CometMessage();
errorMessage.Name = "aspNetComet.error";
errorMessage.MessageId = 0;
errorMessage.Contents = message;
CometMessage[] messages = new CometMessage[] { errorMessage };
DataContractJsonSerializer serializer = new DataContractJsonSerializer(messages.GetType());
serializer.WriteObject(context.Response.OutputStream, messages);
context.Response.End();
}
}
}
CometClientEventHandler.cs:
CometClientEventHandler.cs负责事件的广播。不需要有太多修改。
总结:
至此COMET核心的框架全部构建完成,由于身体原因再加上出于工作进度上的考虑,下一篇随笔将进入应用环节,同时为大家准备了相应的应用范例。