【问题标题】:RxJava - How to create a server for long pollingRxJava - 如何为长轮询创建服务器
【发布时间】:2015-09-17 13:58:17
【问题描述】:

我有一个集群中有多个节点的应用程序。每个节点都将日志文件写入其本地磁盘。我已经实现了一个日志搜索功能,可以在每个节点上搜索日志。收到浏览器搜索请求的所有者节点将日志搜索作业提交给其他节点,然后其他节点将搜索结果传递给原始节点。客户端 Web 浏览器使用长轮询从节点获取搜索结果。这似乎很适合 RxJava,因为每个节点都是一个事件流,客户端从所有节点获取一个合并的事件流。 (假设一个小气的运营团队不会让我们使用 Splunk 或其他一些商业日志记录解决方案)。

客户端轮询原始节点上的 REST API,该节点收集搜索结果。我对 REST API 的理想逻辑如下:

  • 服务器应让客户端最多等待 15 秒以等待响应。
  • 如果在 15 秒内未生成任何结果,则响应可能为空。客户端将看到这一点并发送新的轮询请求。
  • 如果没有更多结果(即搜索完成),请向客户端发送特殊响应,指示其不再轮询。
  • 如果生成了结果,则最多再等待 100 毫秒以获得额外结果,以节省额外轮询的网络开销。
  • 客户端不应获得任何重复的结果。

我编写了以下示例代码来模拟这种情况:

public static void main(String[] args) throws Exception {
    ExecutorService executorService = Executors.newFixedThreadPool(5);

    /* Each searchTask represents the results of a search job running on a
     * node in the cluster */
    Subject<String,String> searchTask1 = PublishSubject.create();
    Subject<String,String> searchTask2 = PublishSubject.create();

    // Limit max number of search results
    Observable<String> searchResults = 
        Observable.merge(searchTask1, searchTask2).take( 1000 );

    /* Add a 100ms buffer window to collect nearby responses together.  
     * Filter out any empty buffers to eliminate unnecessary
     * responses to the browser. */
    BlockingObservable<List<String>> blocking = 
        searchResults.buffer(100, TimeUnit.MILLISECONDS)
            .filter(results -> !results.isEmpty()).toBlocking();
    Iterator<List<String>> it = blocking.getIterator();

    /* Each call to searchTask.onNext represents a search result pushed
     * to the owner node from another node.  This code would be called 
     * from the REST endpoint. */
    executorService.submit( () -> {
        searchTask1.onNext("1");
        try { Thread.sleep(1200); } catch ( Exception ignored ) { }
        searchTask1.onNext("2");
        searchTask1.onCompleted();
    });
    executorService.submit( () -> {
        searchTask2.onNext("a");
        try { Thread.sleep(500); } catch ( Exception ignored ) { }
        searchTask2.onNext("b");
        searchTask2.onCompleted();
    });

    executorService.submit( () -> {
        /* Each iteration of this loop represents a polling request from
         * the browser and the results that are sent back to it. */
        for ( int i = 0; i < 5; i++ ) { 
            it.forEachRemaining(results -> System.out.println(results));
        }
    });

    Thread.sleep(1500);
    System.out.println("exit");
}

for 循环中的逻辑应该是什么,以确保始终在最多 15 秒后将响应发送回客户端(即使响应为空)?

编辑:我已经用更多 cmets 更新了示例代码并展示了我当前的解决方案,但我仍然无法获得我正在寻找的 15 秒的最大响应时间。我们有网络设备会关闭一个闲置时间过长的 HTTP 连接,所以我想保证客户端总是在最多 15 秒后得到响应。

【问题讨论】:

  • 能否详细说明,比如收到部分结果时如何轮询进一步的结果,什么是300ms超时?
  • @zsxwing 我已经用更多细节更新了示例代码。
  • 不是你的问题的答案,但你看过Kibana吗?

标签: java reactive-programming rx-java


【解决方案1】:

我能找到的所有 RxJava 文章似乎都非常关注客户端代码而不是服务器端代码。但是,在与 Observable 运算符争论了一会儿之后,我想出了以下解决方案,这与我想要的很接近。心跳每 15 秒发生一次,而不是在上一个结果之后的 15 秒发生一次。这意味着服务器可能会在发送结果后立即向客户端发送心跳响应,但这对我来说已经足够接近了。

我创建了一个 15s 间隔的 observable 并将它与我已经拥有的 searchResults observable 合并。我使用了一个主题,以便我可以在结果流停止时停止可观察的间隔(否则它将无限期地继续下去)。

    /* Add a heartbeat that ensures we don't wait too long between
     * sending responses and some network device kills our connection */
    final Observable<String> heartbeat =
        Observable.interval( 1, TimeUnit.SECONDS ).map(el -> "heartbeat");
    final PublishSubject<String> stopHeartbeat = PublishSubject.create();
    searchResults.subscribe( el -> {}, ex -> {}, () -> stopHeartbeat.onNext( null ) );
    final Observable<String> searchResultsWithHeartbeat =
        searchResults.mergeWith( heartbeat.takeUntil( stopHeartbeat ) );

【讨论】:

    猜你喜欢
    • 2013-05-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-02-10
    • 2012-08-21
    • 2012-06-10
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多