【问题标题】:RxJS/ReactiveX Proper modules communicationRxJS/ReactiveX 正确的模块通信
【发布时间】:2017-03-17 09:27:42
【问题描述】:

我对响应式编程很陌生,但已经爱上了。但是,仍然很难将我的大脑切换到它。我正在尝试遵循所有建议,如“避免使用主题”和“避免不纯函数”,当然还有“避免命令式代码”。

我发现很难实现的是简单的跨模块通信,其中一个模块可以注册“动作”/可观察,而另一个可以订阅并对其做出反应。一个简单的消息总线可能会起作用,但这将强制使用我试图避免的主题和命令式代码样式。

所以这是我正在玩的一个简单的起点:

    // some sandbox
class Api {
  constructor() {
    this.actions = {};
  }

  registerAction(actionName, action) {
    // I guess this part will have to be changed
    this.actions[actionName] = action.publishReplay(10).refCount();
    //this.actions[actionName].connect();
  }

  getAction(actionName) {
    return this.actions[actionName];
  }
}

const api = new Api();

// -------------------------------------------------------------------
// module 1
let myAction = Rx.Observable.create((obs) => {
  console.log("EXECUTING");
  obs.next("42 " + Date.now());
  obs.complete();
});

api.registerAction("myAction", myAction);

let myTrigger = Rx.Observable.interval(1000).take(2);

let executedAction = myTrigger
.flatMap(x => api.getAction("myAction"))
.subscribe(
  (x) => { console.log(`executed action: ${x}`); },
  (e) => {}, 
  () => { console.log("completed");});

// -------------------------------------------------------------------
// module 2
api.getAction("myAction")
  .subscribe(
  (x) => { console.log(`SECOND executed action: ${x}`); },
  (e) => {}, 
  () => { console.log("SECOND completed");});

所以目前第二个模块订阅它“触发”了“myAction”Observable。在现实生活中,这可能是一个 ajax 调用。有什么方法可以让所有订阅者延迟/等待,直到从 module1 正确调用“myAction”?再一次 - 使用主题很容易做到这一点,但我正在尝试按照推荐的做法来做到这一点。

【问题讨论】:

  • "myAction" 是从模块 1 中正确调用的 是什么意思?你的意思是直到它完成还是什么?
  • 是的。完成将工作

标签: rxjs rxjs5 reactivex reactive-extensions-js


【解决方案1】:

如果我对您的理解正确,您希望确保,如果您调用 api.getAction,您希望该 observable 中的下一个值等到对 getAction 的调用完成。在处理其他值之前。

这是您可以使用 concatMap 轻松实现的。 ConcatMap 将采用一个返回 observable 的函数(在您的情况下是对 getAction 的调用)。 ConcatMap 将等待开始处理下一个值,直到函数中返回的 observable 完成。

因此,如果您像这样更改代码,它应该可以工作(如果我理解正确的话)。

let executedAction = myTrigger
.concatMap(x => api.getAction("myAction"))
.subscribe(
  (x) => { console.log(`executed action: ${x}`); },
  (e) => {}, 
  () => { console.log("completed");});

如果 myTrigger 有新值,则在 api.getAction 返回的 observable 完成之前不会对其进行处理。

【讨论】:

  • 谢谢。这是我正在考虑的改进。但是我的主要问题是来自 module2 的订阅在来自 module1 的订阅之前触发了发射。所以我想我需要某种延迟直到发出然后订阅?但就我进入文档而言,延迟可能只是时间(使用 rxjs5)
  • 因此您希望模块 1 的调用在处理来自模块 2 的调用之前完成。但问题是,如果是 AJAX 调用,您是希望调用执行两次还是希望来自第二个模块的调用触发第二个请求
  • 我只想从 module2 订阅只“收听”而不“触发”请求
  • 随着我深入挖掘,我认为实现这一目标的最佳方法是使用调度程序?定制一个。但不知道您是否可以创建自定义调度程序。
【解决方案2】:

所以这里有一个比我想象的更简单的解决方案。只需使用 2 个 observables。使用 schedulers 和 subscribeOn 也可以达到类似的效果。

    // some sandbox
class Action {
  constructor(name, observable) {
    this.name = name;
    this.observable = observable;
    this.replay = new Rx.ReplaySubject(10);
  }
}

function actionFactory(action, param) {

  return Rx.Observable.create(obs => {
    action.observable
    .subscribe(x => {
        obs.next(x);
        action.replay.next(x);
    }, (e) => {}, () => obs.complete);
  }); 
}

class Api {
  constructor() {
    this.actions = {};
  }

  registerAction(actionName, action) {
    let generatedAction = new Action(actionName, action);

    this.actions[actionName] = generatedAction;

    return actionFactory.bind(null, generatedAction);
  }

  getAction(actionName) {
    return this.actions[actionName].replay;
  }
}

const api = new Api();

// -------------------------------------------------------------------
// module 1
let myAction = Rx.Observable.create((obs) => {
  obs.next("42 " + Date.now());
  obs.complete();
});

let myRegisteredAction$ = api.registerAction("myAction", myAction);

let myTrigger = Rx.Observable.interval(1000).take(1).delay(1000);

let executedAction = myTrigger
.map(x => { return { someValue: x} })
.concatMap(x => myRegisteredAction$(x))
.subscribe(
  (x) => { console.log(`MAIN: ${x}`); },
  (e) => { console.log("error", e)}, 
  () => { console.log("MAIN: completed");});


// -------------------------------------------------------------------
// module 2
 var sub = api.getAction("myAction")
  .subscribe(
  (x) => { console.log(`SECOND: ${x}`); },
  (e) => {console.log("error : " + e)}, 
  () => { console.log("SECOND: completed");});

【讨论】:

    猜你喜欢
    • 2017-08-02
    • 2016-08-25
    • 2020-07-15
    • 1970-01-01
    • 1970-01-01
    • 2017-03-22
    • 1970-01-01
    • 2017-02-20
    • 1970-01-01
    相关资源
    最近更新 更多