【发布时间】:2011-02-11 16:10:50
【问题描述】:
我有一个流媒体服务器,它的合同看起来像这样:
[ServiceContract]
public interface IStreamingService
{
[OperationContract(Action = "StreamingMessageRequest", ReplyAction = "StreamingMessageReply")]
Message GetStreamingData(Message query);
}
这是一个基本的实现,删除了一些东西(如错误处理)以简化事情:
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
[ErrorBehavior(typeof(StreamingServiceErrorHandler))]
public class StreamingService : IStreamingService
{
public StreamingService()
{
}
public Message GetStreamingData(Message query)
{
var dataQuery = query.GetBody<DataQuery>();
// Hook up events to let us know if the client disconnects so that we can stop the query...
EventHandler closeAction = (sender, ea) =>
{
dataQuery.Stop();
};
OperationContext.Current.Channel.Faulted += closeAction;
OperationContext.Current.Channel.Closed += closeAction;
Message streamingMessage = Message.CreateMessage(
MessageVersion.Soap12WSAddressing10,
"QueryMessageReply",
new StreamingBodyWriter(QueryMethod(dataQuery));
return streamingMessage;
}
public IEnumerable<object> QueryMethod (DataQuery query)
{
// Returns a potentially infinite stream of objects in response to the query
}
}
此实现使用自定义 BodyWriter 从 QueryMethod 流式传输结果:
public class StreamingBodyWriter : BodyWriter
{
public StreamingBodyWriter(IEnumerable items)
: base(false) // False should be passed here to avoid buffering the message
{
Items = items;
}
internal IEnumerable Items { get; private set; }
private void SerializeObject(XmlDictionaryWriter writer, object item)
{
// Serialize the object to the stream
}
protected override void OnWriteBodyContents(XmlDictionaryWriter writer)
{
foreach (object item in Items)
{
SerializeObject(writer, item);
}
}
}
客户端连接并开始读取数据流。像这样的:
public IEnumerable<T> GetStreamingData<T>(Message queryMessage)
{
Message reply = _Server.GetStreamingData(queryMessage);
// Get a chunckable reader for the streaming reply
XmlReader reader = reply.GetReaderAtBodyContents();
// Read the stream of objects, deserializing each one as appropriate
while (!reader.EOF)
{
object item = DeserializeObject(reader);
if (item == null)
continue;
// Yield each item as it's deserialized, resulting in a [potentially] never-ending stream of objects
yield return (T)item;
}
}
这很好用,我将对象流返回给客户端。很不错。问题是当客户端断开中间流(优雅或不优雅)时。在任何一种情况下,服务器都不会收到有关此断开连接的通知,除非服务的错误处理程序发现了一个错误。
如您所见,我尝试在服务方法中挂钩 Faulted 和 Closed 通道事件,但是当客户端断开连接时它们不会触发。
我怀疑这些事件不会触发,因为使用流式合同,服务已经从操作方法 (GetStreamingData) 返回。此方法返回了带有自定义正文编写器的消息,并且此正文写入(在服务通道堆栈中较低)正在从计算线程(通过 IEnumerable)中获取结果并将它们流式传输到回复消息中。既然操作方法已经返回,那么我猜这些通道事件不会被触发。
绑定是 net.tcp 绑定(非双工)的自定义版本,只有两个绑定元素:BinaryMessageEncodingBindingElement 和 TcpTransportBindingElement。没有安全或任何东西。基本上只是带有二进制消息的原始 net.tcp。
问题是,当客户端断开连接时,我希望服务器停止正在生成结果并将它们提供给 BodyWriter 正在读取的 IEnumerable 的后台计算线程。正文作者已停止(显然),但线程继续存在。
那么我可以在哪里挂钩以发现客户端是否在中途断开连接?
更新:
有两种主要的断开连接情况: 显式断开连接,由于客户端代理的处置或客户端进程的终止;被动断开连接,通常是由于网络故障(例如:ISP 断开连接,或网线被拉断)。
在第一种情况下,当服务器尝试将新数据发送到流中时,会收到一个明确的连接异常。没问题。
第二种情况是网管断了比较麻烦。通常这种情况是根据发送超时检测到的,但由于这是一个流接口,所以发送超时会被调高(我把它调到几天,因为可以想象流数据传输可以持续那么长时间)。因此,当客户端由于网络管道不稳定而断开连接时,服务会继续通过不再存在的连接发送数据。
此时我不知道解决第二个选项的好方法。有什么建议吗?
【问题讨论】:
-
我也遇到过这个问题,配置和你几乎一模一样。我需要知道客户端何时断开连接,以便快速处理稀缺资源。您是否曾经为此找到解决方案或解决方法?谢谢。