【发布时间】:2013-11-28 20:50:55
【问题描述】:
经过相当多的思考和在互联网上翻找,我想出了(我认为是)将 TcpIp 转换为 Observable 数据流的非常漂亮的封装:
public static IObservable<string> CreateFromTcpIp(IPAddress ip, int port)
{
return
Observable.Using(
() => new TcpClient(), //create client
client => client.ConnectObservable(ip, port) //connect async
.Select(_ => client.GetStream()) // get stream
.Select(stream => new BinaryReader(stream)) // get reader
.SelectMany(reader => reader.ToObservable()) // see below
.Retry() // if error, retry
)
.Publish().RefCount(); //only subscribe once for all subscribers
}
其中ConnectObservable 就是client.ConnectAsync(host, port).ToObservable(),而reader.ToObservable 看起来像这样:
private static IObservable<string> ToObservable(this BinaryReader source)
{
return Observable.Create<string>(
observer =>
{
return Observable.Generate(
source,
reader => true, //just keep on going
reader => reader,
reader => reader.ReadString())
.Subscribe(observer);
});
}
我面临的问题是如何进行测试。我不想在调用堆栈中一直实现/包装接口。
我一直在尝试在我的测试类中使用 TcpListener 创建一个“服务器”,但是遇到了各种(可能不相关的)问题,让订阅和服务共存(订阅我的 CreateFromTcpIp 的结果似乎永远不会返回 IDisposable,在我的测试中插入延迟似乎最终会锁定程序集,因此我无法重建)。
是否有我遗漏的替代方法?
EDIT 订阅不返回的问题与对有故障的 Observable 进行重试有关。我问了另一个问题here。
【问题讨论】:
-
您是否考虑过使用 Subject
(或 ISubject)而不是 IObservable ?它非常适合单元测试。 -
首先,取出
Retry,直到完成单元测试。其次,将单元测试代码发布在您尝试设置TcpListener的位置以及您尝试针对它测试您的方法的位置。 -
所以......可能只是我,但这闻起来不香吗?我的意思是,这里似乎有很多假设......就像,如果流错误,我们应该永远重试。
-
如果它也没有出错...... BinaryReader ToObservable 如果流完成会抛出错误,它将尝试读取流的末尾。
-
@JamesWorld,我想我已经找到了。在 Tcp 没有响应的情况下,是 ConnectAsync(或背后的 ToObservable)没有“告诉我”错误(它只在我 unsubscribe 时告诉我 - 没有帮助)。在它响应的情况下,我需要做一个
SubscribeOn来释放主线程,以便它可以将订阅返回给我。感谢您的帮助!
标签: c# system.reactive tcpclient