【问题标题】:How to compose streams\Observables correctly如何正确组合流\Observables
【发布时间】:2014-12-20 13:08:18
【问题描述】:

我在理解如何在具有不同返回类型的流\Observables 之间进行组合时遇到概念性问题。

这是我正在尝试编码的草稿方法:

public void findSeat() {
    rx.Observable<GameObject> userObs = context.getUser();
    rx.Observable<ActiveGame> gameObs = context.findGame();

    rx.Observable.zip(userObs, gameObs, (userObj, game) -> {

        User user = ...;

        final List<Object> results = new ArrayList<Object>(3); 

        if(userObj.getStatus() != ErrorCodes.STATUS_OK) {
            results.add(-1);
            return results;
        }

        ...
        ...

        //***********************************
        // THE PROBLEM IS HERE: 
        // "context.getActiveGameManager().updateGame(game)" returns Observable<GameOBject> and not List<Object> like .zip() expects.
        // because of that I cannot do:
        // "return context.getActiveGameManager().updateGame(game);"
        // How can I do this convertion from Observable<GameObject> to List<Object>
        //************************************

        context.getActiveGameManager().updateGame(game)
            .map((gameObj) -> {

                if(gameObj.getStatus() != ErrorCodes.STATUS_OK) {
                    results.add(-2);
                    return (Observable<? extends Object>) results;
                }

                results.add(ErrorCodes.STATUS_OK);
                results.add(user);
                results.add(gameObj);
                return gameObs;
        });

        return Observable.empty();

    }).subscribe((results) -> {

        int status = (int) results.get(0);
        User user = (User) results.get(1);
        ActiveGame game = (ActiveGame) results.get(2);


        replyObj.reply(new JsonObject()
                    .putString("action", CommandActions.FIND_SEAT)
                    .putNumber("status", status);
                    .putNumber("game_id", game.getGameId())
                );

    });
}

流程如下: 1. 使用 .zip 方法发出 2 个 Observable。 2. 对流的返回值做一些逻辑,如果它导致错误代码 --> 将其放入列表中并返回,以便“订阅”可以将错误返回给用户。 3. 如果没有错误,使用 flatMap() 发出另一个“更新”方法——这就是我的问题所在。 4. 最终,所有的结果都应该在“订阅”中处理,因为这是我承认用户关于他的请求的一点。

希望它足够清楚......

顺便说一句,我正在尝试学习 rxJava,但我很难找到足够好的资源 - 有人可以向我推荐最好的学习方法吗?我尝试在 Youtube、Wikipedia、Github 上查看教程......其中大多数使用 Scala 和其他脚本语言进行教学 - 在 Java 中找不到任何内容。

感谢所有努力理解它的人!

【问题讨论】:

    标签: java system.reactive reactive-programming rx-java


    【解决方案1】:

    我想你已经快到了,但尝试将 .zip lambda 中的代码分解为更小的 Rx 操作。例如:

    rx.Observable
        .zip(userObs, gameObs, (userObj, game) -> {
            // Combine the user & game objects and pass them to the
            // next Rx operation.
            return new UserAndActiveGame(userObj, game);
        })
        .filter(userAndActiveGame -> {
            // Remove this filter if you want errors to make it to the subscriber.
            return userAndActiveGame.getUserObj().getStatus() == ErrorCodes.STATUS_OK;
        })
        .flatMap(userAndActiveGame -> {
            // Remove this check if you filter errors above.
            if (userAndActiveGame.getUserObj().getStatus() != ErrorCodes.STATUS_OK) {
                return Observable.just(new FindSeatResult(-1));
            }
    
            return context.getActiveGameManager().updateGame(userAndActiveGame.getGame())
                .map(gameObj -> {
                    if (gameObj.getStatus() != ErrorCodes.STATUS_OK) {
                        return new FindSeatResult(-2);
                    }
    
                    User user =...; // Whatever you are doing to get this in your example code.
                    return new FindSeatResult(ErrorCodes.STATUS_OK, user, gameObj);
                });
        })
    

    以下类用于传递中间结果和最终结果:

    private class UserAndActiveGame {
        private final GameObject userObj;
        private final ActiveGame game;
    
        public UserAndActiveGame(GameObject userObj, ActiveGame game) {
            this.userObj = userObj;
            this.game = game;
        }
    
        public GameObject getUserObj() {
            return userObj;
        }
    
        public ActiveGame getGame() {
            return game;
        }
    }
    
    private class FindSeatResult {
        private final int status;
        private final User user;
        private final ActiveGame game;
    
        public FindSeatResult(int status) {
            this(status, null, null);
        }
    
        public FindSeatResult(int status, User user, ActiveGame game) {
            this.status = status;
            this.user = user;
            this.game = game;
        }
    
        public User getUser() {
            return user;
        }
    
        public int getStatus() {
            return status;
        }
    
        public ActiveGame getGame() {
            return game;
        }
    }
    

    然后,您的订阅者将使用与您已经在做的类似的打包结果。

    .subscribe((results) -> {
        // You don't need this if you filter errors above.
        if (findSeatResult.getStatus() == -1) {
            return;
        }
    
        int status = findSeatResult.getStatus();
        User user = findSeatResult.getUser();
        ActiveGame game = findSeatResult.getGame();
    
        replyObj.reply(new JsonObject()
                    .putString("action", CommandActions.FIND_SEAT)
                    .putNumber("status", status);
                    .putNumber("game_id", game.getGameId())
                );
    });
    

    通过使用中间和最终结果类而不是在 List&lt;Object&gt; 中传递您的结果,您的代码对更改更加宽容,编译器将为您检查所有内容。

    【讨论】:

    • 哇 - 感谢您的回答 - 我正在查看您的回复,基本上是在尝试理解它。谢谢!!
    • 因为我在理解如何做到这一点时遇到了问题,所以我正要使用嵌套的 .zip() ......但你的解决方案对我来说看起来很完美!谢谢。
    • 关于.filter,如果条件返回true-->继续下一阶段,如果'false'-->比什么?它打破了链条,就是这样??有没有办法让它去'订阅'或'错误'方法向用户发送回复。
    • 问题:假设它比上面的例子更复杂,并且我需要进行更多的 rx 调用——所以如果我没记错的话,我需要为将参数从一个 rx 调用传递给另一个 - 对吧?
    • 对于过滤器,为您想要继续处理的任何项目返回 true。返回 false 将丢弃该项目。由于听起来您希望错误继续存在,因此您可以完全删除 .filter 操作。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-03-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-05-22
    • 2013-03-01
    • 2020-09-30
    相关资源
    最近更新 更多