【发布时间】:2016-04-03 17:24:40
【问题描述】:
我是一名 RxJava 新手,试图完成以下简单任务:
- 通过套接字连接到定期发送数据的服务器
- 在数据到达时读取数据
- 如果套接字可写,则每隔
n毫秒向服务器写入一条心跳消息
我编写了工作 RxJava 程序来“连接和读取”和“连接和写入”,但现在我想编写一个同时执行这两种操作的程序。我有一个连接到套接字的函数:
Observable<SocketChannel> connect(String host,int port) { ...
...从中读取的函数
Observable<byte[]> readFromSocket(SocketChannel s) {
...和一个测试可写性的函数(它在可写时返回套接字)
Observable<SocketChannel> whenWritable(SocketChannel s) {
所以我的方法如下所示:
connect("localhost", 31337).forEach(socket -> {
readFromSocket(socket)
.forEach(bytes -> printf("read %d bytes",bytes.length);
Observable
.interval(2500, TimeUnit.MILLISECONDS)
.flatMap(ignore -> whenWritable(socket))
.forEach(ignore -> println("write heart beat"));
});
我运行程序时,“read n bytes”的消息有规律的出现,但是心跳没有写出来。
但是,以下程序可以工作:
connect("localhost", 31337)
.flatMap(s ->
Observable
.interval(2500, TimeUnit.MILLISECONDS)
.flatMap(ignore -> whenWritable(s)))
.forEach(ignore -> println("write heart beat"));
这里有什么问题,我在 RxJava 中的读写习惯是什么?
【问题讨论】:
标签: java sockets rx-java reactive-programming