【问题标题】:RxJs: How to only maintain the latest value until inner observable completeRxJs:如何只保持最新值直到内部可观察完成
【发布时间】:2018-07-02 23:44:13
【问题描述】:

我是 RxJs 新手,无法以“RxJs 方式”实现这一目标:

无限流a$ 会偶尔发出一个值a

async() 接受a 并执行异步操作。

如果a$async 处于挂起状态时发出值,则只保留最新的al

前一个async完成后,如果有al,运行async(al)

等等。

a$:----a1----------a2----a3-----------------------a4-----------
       async(a1):------------end                  async(a4):---
                             async(a3):-----end

这是我想出来的,有点讨厌:

var asyncIdle$ = new Rx.BehaviorSubject()
var asyncRunning$ = new Rx.Subject()
var async$ = asyncIdle$

function async (val) {
  async$ = asyncRunning$
  // do something with val
  console.log(val + ' handling')
  setTimeout(() => {
    console.log(val + ' complete')
    async$.next()
    async$ = asyncIdle$
  }, 2000)
}

// simulate a$
var a$ = Rx.Observable.fromEvent(document, 'click')
.mapTo(1)
.scan((acc, curr) => acc + curr)
.do(val => console.log('got ' + val))


a$.debounce(() => async$)
.subscribe(val => {
  async(val)
})

【问题讨论】:

标签: javascript rxjs observable rxjs5 reactive


【解决方案1】:

使用first()repeat() 的组合。如果a$ 完成发射,则序列完成

//emit every 1s
const a$=new Rx.BehaviorSubject(0)
Rx.Observable.interval(1000).take(100).skip(1).subscribe(a$);

// //simulate aysnc 
const async = (val)=>{
  console.log('async start with:'+ val)
  return Rx.Observable.timer(5100).mapTo('async done:'+val);
}

a$.first().switchMap(value=>async(value))
  .repeat()
  .catch(e=>Rx.Observable.empty())
  .subscribe(console.log,console.err,console.warn)

a$.subscribe(console.warn)

https://jsbin.com/tohahod/65/edit?js,console

【讨论】:

  • 即使 a$ 没有发出新值,它不会永远重复运行吗?
  • 我的意思是async
  • 如果你的 a$ 是热可观察的,a$.take(1) 只会给你最新的,如果没有它不会发出。类似 Observable.fromEvent('click', a).take(1) 如果没有点击就不会发射
  • 哦,我的意思是当 a$ 停止发出新值时,async 不会继续运行吗?
  • 也许我可以测试一下
【解决方案2】:

我在打字稿中提出了这个解决方案:

我有一个简单的Gate 类,可以打开或关闭:

enum GateStatus {
  open = "open",
  closed = "closed"
}

class Gate {
  private readonly gate$: BehaviorSubject<GateStatus>;

  readonly open$: Observable<GateStatus>;
  readonly closed$: Observable<GateStatus>;

  constructor(initialState = GateStatus.open) {
    this.gate$ = new BehaviorSubject<GateStatus>(initialState);
    this.open$ = this.gate$
      .asObservable()
      .pipe(filter(status => status === GateStatus.open));
    this.closed$ = this.gate$
      .asObservable()
      .pipe(filter(status => status === GateStatus.closed));
  }

  open() {
    this.gate$.next(GateStatus.open);
  }
  close() {
    this.gate$.next(GateStatus.closed);
  }
}

算子函数很简单。一开始大门是敞开的。在开始请求之前,我们将其关闭,当请求完成后,我们再次打开它。
audit() 只会在门打开时让最新的请求数据通过。

export const requestThrottle = <T>(
  requestHandlerFactory: (requestData: T) => Observable<any>
) => (requestData: Observable<T>) => {
  const gate = new Gate();
  return requestData.pipe(
    audit(_ => gate.open$),
    // NOTE: when the order is important, use concatMap() instead of mergeMap()
    mergeMap(value => {
      gate.close();
      return requestHandlerFactory(value).pipe(finalize(() => gate.open()));
    })
  );
};

像这样使用它:

src.pipe(
    requestThrottle(() => of(1).pipe(delay(100)))
);

code exmaple on stackblitz

【讨论】:

    【解决方案3】:

    您可以使用audit operator 来解决问题,像这样(cmets 应该解释它是如何工作的):

    // Simulate the source.
    
    const source = Rx.Observable.merge(
      Rx.Observable.of(1).delay(0),
      Rx.Observable.of(2).delay(10),
      Rx.Observable.of(3).delay(20),
      Rx.Observable.of(4).delay(150),
      Rx.Observable.of(5).delay(300)
    ).do(value => console.log("source", value));
    
    // Simulate the async task.
    
    function asyncTask(value) {
      return Rx.Observable
        .of(value)
        .do(value => console.log(" before async", value))
        .delay(100)
        .do(value => console.log(" after async", value));
    }
    
    // Compose an observable that's based on the source.
    // Use audit to ensure a value is not emitted until
    // the async task has been performed.
    // Use share so that the signal does not effect a
    // second subscription to the source.
    
    let signal;
    
    const audited = source
      .audit(() => signal)
      .mergeMap(value => asyncTask(value))
      .share();
    
    // Compose a signal from the audited observable to
    // which the async task is applied.
    // Use startWith so that the first emitted value
    // passes the audit.
    
    signal = audited
      .mapTo(true)
      .startWith(true);
    
    audited.subscribe(value => console.log("output", value));
    .as-console-wrapper { max-height: 100% !important; top: 0; }
    &lt;script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"&gt;&lt;/script&gt;

    【讨论】:

    • 也许我误解了这个问题,但我认为这个解决方案是不正确的。例如当我执行代码时,我看到这个序列:before async 3source 4before async 4after async 3。我认为应该跳过异步调用4,因为异步3仍在进行中,对吧?
    • @TmTron 我也注意到了。来自audit 的最新值不会通过audit,这就是为什么下一个新值会被重新试听。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-07-11
    • 2022-12-18
    • 2020-06-24
    • 2018-06-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多