【发布时间】:2012-08-14 11:05:31
【问题描述】:
我正在编写一个有趣的 .NET XMPP 库,正如 elsewhere 所讨论的那样,.NET 4.5 之前的版本中的 XmlReader 实现不适合从 NetworkStream 解析 XML,因为它不会开始解析,直到它填满内部 4KB 缓冲区或达到 EOF。
其他库完全不使用XmlReader 解决了这个问题。正如前面链接的问题中提到的,jabber-net 使用 Java XML 解析器的一个端口。我在搜索时发现的一个实现,Babel IM,使用它自己的simple XML parser。我不确定 agsXMPP 是做什么的。
然而,随着 .NET 4.5 的发布和新的异步功能XmlReader 显然得到了升级,现在可以做到真正的async parsing。因此,我使用它来构建一个相当简单的 XMPP 客户端,该客户端可以连接到服务器并发送和接收消息。
然而,真正的症结似乎在于与服务器的断开。在断开连接时,我通常只想 Dispose() 我的 XmlReader 实例和底层流。但是,Dispose() 实际上会抛出 InvalidOperationException 消息“异步操作已在进行中”。如果您在异步时调用它...以及消息中的内容。然而,由于 XMPP 的性质,我的XmlReader 基本上不断地执行异步操作,因为它等待来自服务器的 XML 节从管道中下来。
XmlReader 上似乎没有任何methods 可以用来告诉它取消任何挂起的异步操作,以便我可以干净地Dispose() 它。 有没有比简单地不尝试处理XmlReader 更好的方法来处理这种情况? XMPP spec 声明服务器应该在断开连接时发送关闭</stream:stream> 标记.我可以将此作为一个信号,不要尝试执行另一次异步读取,因为管道中应该没有其他内容,但不能保证这一点。
这里有一些示例代码可供使用。 LongLivedTextStream 基本上模拟了一个打开的 NetworkStream,因为它永远不会到达 EOF,并且会阻塞直到可以读取至少 1 个字节。您可以向其中“注入”XML 文本,XmlReader 会很乐意解析,但尝试处置阅读器会触发上述异常。
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
namespace Example
{
class LongLivedTextStream : Stream
{
ManualResetEvent moarDatas = new ManualResetEvent(false);
List<byte> data = new List<byte>();
int pos = 0;
public void Inject(string text)
{
data.AddRange(new UTF8Encoding(false).GetBytes(text));
moarDatas.Set();
}
public override int Read(byte[] buffer, int offset, int count)
{
var bytes = GetBytes(count).ToArray();
for (int i = 0; offset + i < buffer.Length && i < bytes.Length; i++)
{
buffer[offset + i] = bytes[i];
}
return bytes.Length;
}
private IEnumerable<byte> GetBytes(int count)
{
int returned = 0;
while (returned == 0)
{
if (pos < data.Count)
{
while (pos < data.Count && returned < count)
{
yield return data[pos];
pos += 1; returned += 1;
}
}
else
{
moarDatas.Reset();
moarDatas.WaitOne();
}
}
}
#region Other Stream Members
public override bool CanRead
{
get { return true; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return false; }
}
public override void Flush() { }
public override long Length
{
get { throw new NotSupportedException(); }
}
public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
#endregion
}
public class Program
{
public static void Main(string[] args)
{
Test();
Console.ReadLine();
}
public static async void Test()
{
var stream = new LongLivedTextStream();
var reader = XmlReader.Create(stream, new XmlReaderSettings() { Async = true });
var t = Task.Run(() =>
{
stream.Inject("<root>");
Thread.Sleep(2000);
stream.Inject("value");
Thread.Sleep(2000);
stream.Inject("</root>");
Thread.Sleep(2000);
reader.Dispose(); // InvalidOperationException: "An asynchronous operation is already in progress."
Console.WriteLine("Disposed");
});
while (await reader.ReadAsync())
{
bool kill = false;
switch (reader.NodeType)
{
case XmlNodeType.Element:
Console.WriteLine("Start: " + reader.LocalName);
break;
case XmlNodeType.EndElement:
Console.WriteLine("End: " + reader.LocalName);
//kill = true; // I could use a particular EndElement as a signal to not try another read
break;
case XmlNodeType.Text:
Console.WriteLine("Text: " + await reader.GetValueAsync());
break;
}
if (kill) { break; }
}
}
}
}
编辑
此示例使用实际的NetworkStream 并显示如果我在底层流中Close() 或Dispose() 对XmlReader 的ReadAsync() 调用不会像希望的那样返回false,而是继续阻塞。
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
namespace Example
{
public class Program
{
public static void Main(string[] args)
{
NetworkStream stream = null;
var endpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 50000);
var serverIsUp = new ManualResetEvent(false);
var doneWriting = new ManualResetEvent(false);
var t1 = Task.Run(() =>
{
var server = new TcpListener(endpoint);
server.Start();
serverIsUp.Set();
var client = server.AcceptTcpClient();
var writer = new StreamWriter(client.GetStream());
writer.Write("<root>"); writer.Flush();
Thread.Sleep(2000);
writer.Write("value"); writer.Flush();
Thread.Sleep(2000);
writer.Write("</root>"); writer.Flush();
Thread.Sleep(2000);
doneWriting.Set();
});
var t2 = Task.Run(() =>
{
doneWriting.WaitOne();
stream.Dispose();
Console.WriteLine("Disposed of Stream");
});
var t3 = Task.Run(async () =>
{
serverIsUp.WaitOne();
var socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
socket.Connect(endpoint);
stream = new NetworkStream(socket, true);
var reader = XmlReader.Create(stream, new XmlReaderSettings() { Async = true });
bool val;
while (val = await reader.ReadAsync())
{
bool kill = false;
switch (reader.NodeType)
{
case XmlNodeType.Element:
Console.WriteLine("Start: " + reader.LocalName);
break;
case XmlNodeType.EndElement:
Console.WriteLine("End: " + reader.LocalName);
//kill = true; // I could use a particular EndElement as a signal to not try another read
break;
case XmlNodeType.Text:
Console.WriteLine("Text: " + await reader.GetValueAsync());
break;
}
if (kill) { break; }
}
// Ideally once the underlying stream is closed, ReadAsync() would return false
// we would get here and could safely dispose the reader, but that's not the case
// ReadAsync() continues to block
reader.Dispose();
Console.WriteLine("Disposed of Reader");
});
Console.ReadLine();
}
}
}
【问题讨论】:
-
为什么不能关闭流/套接字,并在
ReadAsync返回false时处理XmlReader? -
不幸的是,关闭底层流似乎不会导致
XmlReader从ReadAsync()返回false,而是继续阻塞。请参阅编辑中的第二个示例。