【问题标题】:RxJava: Feed one stream (Observable) as the input of another streamRxJava:将一个流(可观察)作为另一个流的输入
【发布时间】:2015-10-24 17:38:18
【问题描述】:

我还在学习 RxJava。在另一个流中使用流的最佳方式是什么?还是违反了响应式编程的原则?

我正在尝试编写的一个玩具示例包括一个 TCP 客户端和一个发回大写输入的服务器。我想从标准输入中获取输入,将其发送到服务器并打印出客户端和服务器接收到的所有内容。

以下是程序的预期输出:

(User input) apple
Server received: apple
Client received: APPLE
(User input) peach
Server received: peach
Client received: PEACH

我能够使用三个 observables 来实现这一点:

  • stdinStream 从标准输入发出字符串,
  • serverStream 发出服务器接收到的字符串
  • clientStream 发出客户端接收到的字符串。

然后从clientStream 的创建中订阅inputStream,如下所示:

    private Observable<String> createClientStream(String host, int port, Observable<String> inputStream) {
    return Observable.create(sub -> {
        try (Socket socket = new Socket(host, port);
             BufferedReader inFromServer = new BufferedReader(new InputStreamReader(socket.getInputStream()));
             DataOutputStream outputStream = new DataOutputStream(socket.getOutputStream());
             PrintWriter outWriter = new PrintWriter(outputStream, true);
        ) {
            inputStream.subscribe(line -> {
                outWriter.println(line);
                try {
                    sub.onNext(inFromServer.readLine());
                } catch (IOException e) {
                    sub.onError(e);
                }
            });
        } catch (UnknownHostException e) {
            sub.onError(e);
        } catch (IOException e) {
            sub.onError(e);
        }
    });
}

注意:我不想创建多个客户端,而宁愿让单个客户端运行并指示它根据输入向服务器发送不同的值。因此,不需要将输入映射到新的 clientStream 的方法:

stdinStream.map(line -&gt; createClientStream(line))

所以我的问题是:

  1. 这是使用 RxJava 的合理方式吗?有更好的选择吗?
  2. 作为clientStream 创建的一部分,我创建了客户端套接字。我这样做是为了可以轻松地使用调度程序异步运行它,clientStream.scheduleOn(Schedulers.newThread)。考虑到我的单一客户需求,也许我应该采取不同的做法?

这里是完整的代码:https://gist.github.com/lintonye/25af58abdfcc688ad3c3

【问题讨论】:

    标签: reactive-programming rx-java


    【解决方案1】:

    你需要的是using。将所有与套接字相关的对象放入Connection 类并给定输入序列,将其映射到一对println/readLine,同时保持单个连接。这是gist for a runnable example

    static class Connection {
        Socket socket;
        BufferedReader inFromServer;
        DataOutputStream outputStream;
        PrintWriter outWriter;
    
        public Connection(String host, int port) {
            try {
                socket = new Socket(host, port);
                inFromServer = new BufferedReader(
                    new InputStreamReader(socket.getInputStream()));
                outputStream = new DataOutputStream(socket.getOutputStream());
                outWriter = new PrintWriter(outputStream, true);
            } catch (IOException ex) {
                Exceptions.propagate(ex);
            }
        }
    
        public void close() {
            try {
                outWriter.close();
                outputStream.close();
                inFromServer.close();
                socket.close();
            } catch (IOException ex) {
                Exceptions.propagate(ex);
            }
        }
    }
    
    public static void main(String[] args) {
        runServer();
    
        Observable<String> source = Observable.just("a", "b", "c");
    
        String host = "localhost";
        int port = 8080;
    
        Observable.<String, Connection>using(() -> new Connection(host, port), 
        conn -> 
            source
            .map(v -> {
                conn.outWriter.println(v);
                try {
                    return conn.inFromServer.readLine();
                } catch (IOException ex) {
                    throw Exceptions.propagate(ex);
                }
            })
        , Connection::close)
        .subscribe(System.out::println);
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-04-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多