【问题标题】:Read and write from a Socket simultaneously in RxJava在 RxJava 中同时从 Socket 读取和写入
【发布时间】:2016-04-03 17:24:40
【问题描述】:

我是一名 RxJava 新手,试图完成以下简单任务:

  • 通过套接字连接到定期发送数据的服务器
  • 在数据到达时读取数据
  • 如果套接字可写,则每隔n 毫秒向服务器写入一条心跳消息

我编写了工作 RxJava 程序来“连接和读取”和“连接和写入”,但现在我想编写一个同时执行这两种操作的程序。我有一个连接到套接字的函数:

Observable<SocketChannel> connect(String host,int port) { ...

...从中读取的函数

Observable<byte[]> readFromSocket(SocketChannel s) {

...和一个测试可写性的函数(它在可写时返回套接字)

Observable<SocketChannel> whenWritable(SocketChannel s) {

所以我的方法如下所示:

connect("localhost", 31337).forEach(socket -> {
    readFromSocket(socket)
        .forEach(bytes -> printf("read %d bytes",bytes.length);
    Observable
        .interval(2500, TimeUnit.MILLISECONDS)
        .flatMap(ignore -> whenWritable(socket))
        .forEach(ignore -> println("write heart beat"));
    });

我运行程序时,“read n bytes”的消息有规律的出现,但是心跳没有写出来。

但是,以下程序可以工作:

connect("localhost", 31337)
    .flatMap(s -> 
        Observable
            .interval(2500, TimeUnit.MILLISECONDS)
            .flatMap(ignore -> whenWritable(s)))
    .forEach(ignore -> println("write heart beat"));

这里有什么问题,我在 RxJava 中的读写习惯是什么?

【问题讨论】:

    标签: java sockets rx-java reactive-programming


    【解决方案1】:

    可能是因为readFromSocket(socket)是同步的,那么readFromSocket(socket).forEach会阻塞,后面的代码就不能运行了。你可以在readFromSocket(socket).forEach后面加一个日志来查看。

    要解决此问题,您可以使用readFromSocket(socket).subscribeOn(Schedulers.io()) 在 IO 线程池中运行读取操作。

    【讨论】:

    • 原来如此,使用subscribeOn 解决了这个问题,谢谢!
    猜你喜欢
    • 1970-01-01
    • 2012-02-25
    • 1970-01-01
    • 1970-01-01
    • 2017-06-22
    • 2013-01-06
    • 1970-01-01
    • 1970-01-01
    • 2013-12-17
    相关资源
    最近更新 更多