【发布时间】:2015-10-24 17:38:18
【问题描述】:
我还在学习 RxJava。在另一个流中使用流的最佳方式是什么?还是违反了响应式编程的原则?
我正在尝试编写的一个玩具示例包括一个 TCP 客户端和一个发回大写输入的服务器。我想从标准输入中获取输入,将其发送到服务器并打印出客户端和服务器接收到的所有内容。
以下是程序的预期输出:
(User input) apple
Server received: apple
Client received: APPLE
(User input) peach
Server received: peach
Client received: PEACH
我能够使用三个 observables 来实现这一点:
-
stdinStream从标准输入发出字符串, -
serverStream发出服务器接收到的字符串 -
clientStream发出客户端接收到的字符串。
然后从clientStream 的创建中订阅inputStream,如下所示:
private Observable<String> createClientStream(String host, int port, Observable<String> inputStream) {
return Observable.create(sub -> {
try (Socket socket = new Socket(host, port);
BufferedReader inFromServer = new BufferedReader(new InputStreamReader(socket.getInputStream()));
DataOutputStream outputStream = new DataOutputStream(socket.getOutputStream());
PrintWriter outWriter = new PrintWriter(outputStream, true);
) {
inputStream.subscribe(line -> {
outWriter.println(line);
try {
sub.onNext(inFromServer.readLine());
} catch (IOException e) {
sub.onError(e);
}
});
} catch (UnknownHostException e) {
sub.onError(e);
} catch (IOException e) {
sub.onError(e);
}
});
}
注意:我不想创建多个客户端,而宁愿让单个客户端运行并指示它根据输入向服务器发送不同的值。因此,不需要将输入映射到新的 clientStream 的方法:
stdinStream.map(line -> createClientStream(line))
所以我的问题是:
- 这是使用 RxJava 的合理方式吗?有更好的选择吗?
- 作为
clientStream创建的一部分,我创建了客户端套接字。我这样做是为了可以轻松地使用调度程序异步运行它,clientStream.scheduleOn(Schedulers.newThread)。考虑到我的单一客户需求,也许我应该采取不同的做法?
这里是完整的代码:https://gist.github.com/lintonye/25af58abdfcc688ad3c3
【问题讨论】:
标签: reactive-programming rx-java