【发布时间】:2015-09-17 13:58:17
【问题描述】:
我有一个集群中有多个节点的应用程序。每个节点都将日志文件写入其本地磁盘。我已经实现了一个日志搜索功能,可以在每个节点上搜索日志。收到浏览器搜索请求的所有者节点将日志搜索作业提交给其他节点,然后其他节点将搜索结果传递给原始节点。客户端 Web 浏览器使用长轮询从节点获取搜索结果。这似乎很适合 RxJava,因为每个节点都是一个事件流,客户端从所有节点获取一个合并的事件流。 (假设一个小气的运营团队不会让我们使用 Splunk 或其他一些商业日志记录解决方案)。
客户端轮询原始节点上的 REST API,该节点收集搜索结果。我对 REST API 的理想逻辑如下:
- 服务器应让客户端最多等待 15 秒以等待响应。
- 如果在 15 秒内未生成任何结果,则响应可能为空。客户端将看到这一点并发送新的轮询请求。
- 如果没有更多结果(即搜索完成),请向客户端发送特殊响应,指示其不再轮询。
- 如果生成了结果,则最多再等待 100 毫秒以获得额外结果,以节省额外轮询的网络开销。
- 客户端不应获得任何重复的结果。
我编写了以下示例代码来模拟这种情况:
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(5);
/* Each searchTask represents the results of a search job running on a
* node in the cluster */
Subject<String,String> searchTask1 = PublishSubject.create();
Subject<String,String> searchTask2 = PublishSubject.create();
// Limit max number of search results
Observable<String> searchResults =
Observable.merge(searchTask1, searchTask2).take( 1000 );
/* Add a 100ms buffer window to collect nearby responses together.
* Filter out any empty buffers to eliminate unnecessary
* responses to the browser. */
BlockingObservable<List<String>> blocking =
searchResults.buffer(100, TimeUnit.MILLISECONDS)
.filter(results -> !results.isEmpty()).toBlocking();
Iterator<List<String>> it = blocking.getIterator();
/* Each call to searchTask.onNext represents a search result pushed
* to the owner node from another node. This code would be called
* from the REST endpoint. */
executorService.submit( () -> {
searchTask1.onNext("1");
try { Thread.sleep(1200); } catch ( Exception ignored ) { }
searchTask1.onNext("2");
searchTask1.onCompleted();
});
executorService.submit( () -> {
searchTask2.onNext("a");
try { Thread.sleep(500); } catch ( Exception ignored ) { }
searchTask2.onNext("b");
searchTask2.onCompleted();
});
executorService.submit( () -> {
/* Each iteration of this loop represents a polling request from
* the browser and the results that are sent back to it. */
for ( int i = 0; i < 5; i++ ) {
it.forEachRemaining(results -> System.out.println(results));
}
});
Thread.sleep(1500);
System.out.println("exit");
}
for 循环中的逻辑应该是什么,以确保始终在最多 15 秒后将响应发送回客户端(即使响应为空)?
编辑:我已经用更多 cmets 更新了示例代码并展示了我当前的解决方案,但我仍然无法获得我正在寻找的 15 秒的最大响应时间。我们有网络设备会关闭一个闲置时间过长的 HTTP 连接,所以我想保证客户端总是在最多 15 秒后得到响应。
【问题讨论】:
-
能否详细说明,比如收到部分结果时如何轮询进一步的结果,什么是300ms超时?
-
@zsxwing 我已经用更多细节更新了示例代码。
-
不是你的问题的答案,但你看过Kibana吗?
标签: java reactive-programming rx-java