【问题标题】:In Rxjs How to pause a stream of data and continue from the second It was paused?在 Rxjs 中如何暂停一个数据流并从第二个继续它被暂停了?
【发布时间】:2021-02-19 00:40:30
【问题描述】:

我有一个数据流,每 10 秒轮询一次。长 pol 可以暂停继续

问题是:当它继续时它又重新开始。我必须让它从第二个开始继续它被暂停。

如果:

延迟(10000)

在 (7000) 处暂停

然后当我再次开始长轮询时,我的数据必须在 (10000-7000)=3000

之后提取

今天是第四天我一直被这个问题困住并且发疯了..

我已经创建了这个闪电战

https://stackblitz.com/edit/ang7-working-rxjs-stream-timer-combined?file=src/app/app.component.ts

创建了一个流和一个提取数据的计时器。当流和计时器停止时,设置剩余的时间以及新的延迟。但是当我按下开始时它不会再等待 3 秒它只是立即调用我不想要的服务

【问题讨论】:

标签: javascript angular rxjs reactive rxjs6


【解决方案1】:

这是一个非常基本的实现,它依赖于 timer 函数和 takefiltertakeUntil 运算符。

在我看来,解决方案不太优雅。它依赖于状态变量pausedfirst,并且在订阅的next 回调中有一个递归触发器。另请注意,它目前不进行任何错误处理。在这种情况下,轮询完全停止。

控制器

export const REFRESH_INTERVAL = 11;

@Component({ ... })
export class AppComponent implements OnDestroy {
  pause$ = new Subject();
  reset$ = new Subject();
  closed$ = new Subject();
  counter = REFRESH_INTERVAL;
  res: any;
  paused = false;

  constructor(private freeApiService: freeApiService) {}

  ngOnInit() {
    this.init();
  }

  init() {
    let first = true;
    this.reset$
      .pipe(
        switchMap(_ =>
          timer(0, 1000).pipe(
            take(this.counter),
            tap(_ => (this.paused = this.paused ? false : this.paused)),
            takeUntil(this.pause$),
            filter(_ => first || --this.counter === 0),
            finalize(
              () => (this.paused = this.counter != 0 ? true : this.paused)
            ),
            switchMap(_ => this.freeApiService.getDummy())
          )
        ),
        takeUntil(this.closed$)
      )
      .subscribe({
        next: res => {
          first = false;
          this.res = res;
          this.counter = REFRESH_INTERVAL;
          this.reset$.next();
        },
        error: error => console.log("Error fethcing data:", error)
      });
  }

  ngOnDestroy() {
    this.closed$.next();
  }
}

模板

<div my-app>
    <ng-container *ngIf="res; else noRes">
        <button (mouseup)="paused ? reset$.next() : pause$.next()">
      {{ paused ? 'Resume poll' : 'Pause poll' }}
    </button>

        <br><br>
        Refreshing in: {{ counter }}
        <br>
    Response:
        <pre>{{ res | json }}</pre>
    </ng-container>

    <ng-template #noRes>
        <button (mouseup)="reset$.next()">
      Start poll
    </button>
    </ng-template>
</div>

我修改了你的Stackblitz

【讨论】:

  • 这真是太棒了!太感谢了!一个快速的问题;除了确保只订阅一次之外,这种方法是否会导致内存泄漏?
  • takeUntil(this.closed$) 理论上应该在组件关闭时关闭打开的订阅。所以我会说不,它不会导致任何内存泄漏。
  • 如果我对此代码使用响应缓慢的 api 端点。它重复请求,直到其中一个解决,这就是我想首先阻止的。我该如何解决?
【解决方案2】:

我会使用intervalfilterscan 运算符的组合,检查频率设置为 1s:

const pauserSubject = new BehaviorSubject(true);

const s$ = interval(1000).pipe(
  withLatestFrom(pauserSubject),
  filter(([intervalValue, isAllowed]) => isAllowed),
  scan(acc => acc + 1, 0),
  tap(emissionNumber => console.log('check attempt #', emissionNumber, ' on ', new Date().toTimeString())),
  filter(emissionCount => !(emissionCount % 10)),
  switchMapTo(of('request/response'))
);

Stackblitz example here

在这种情况下,常规的interval 设置“节拍”,每 1 秒检查一次暂停是真还是假。取消暂停后,相应的filter 运算符允许继续。 scan 验证每 10 次检查(或每 10 秒)将产生一次 http 调用。

对于您的场景:

  1. 每 10 秒开始轮询一次
  2. 你在第 3 秒暂停了一秒
  3. 计数器确保应该再进行 7 次检查(导致等待 7 秒)
  4. http 调用在第 11 秒进行(10 常规 + 1 秒等待)

【讨论】:

    【解决方案3】:

    这归结为有一个“可暂停计时器”。完成后,您只需使用 repeat 运算符在它完成时重新运行它:

    somePausableTimer$.pipe(
        repeat(),
        switchMap(() => of('polling time'))
    )
    

    如果您关心时间精度,这是我对“可暂停计时器”的看法 - 我认为它与 javascript 一样精确,但没有“每 10 次检查暂停状态”等解决方案带来的性能损失毫秒”:

    import {delay, distinctUntilChanged, filter, map, mapTo, pairwise, repeat, 
        scan, startWith, switchMap, take, withLatestFrom} from 'rxjs/operators';
    import {defer, fromEvent, NEVER, Observable, of} from 'rxjs';
    
    function pausableTimer(timerSpan: number, isActive$: Observable<boolean>): Observable<boolean> {
    
        const activeState$ = isActive$.pipe(
            distinctUntilChanged(),
            startWith(true, true),
            map(isActive => ({
                isActive,
                at: Date.now()
            }))
        );
    
        const pauseSpans$ = activeState$.pipe(
            pairwise(),
            filter(([,curr]) => curr.isActive),
            map(([prev, curr]) => curr.at - prev.at)
        );
    
        const accumulatedPauseSpan$ = pauseSpans$.pipe(
            scan((acc, curr) => acc += curr, 0)
        );
    
        return defer(() => {
            const startTime = Date.now();
            const originalEndTime = startTime + timerSpan;
    
            return activeState$.pipe(
                withLatestFrom(accumulatedPauseSpan$),
                switchMap(([activeState, accPause]) => {
                    if (activeState.isActive) {
                        return of(true).pipe(
                            delay(originalEndTime - Date.now() + accPause)
                        );
                    }
                    else {
                        return NEVER;
                    }
                }),
                take(1)
            );
        });
    }
    

    该函数接受以毫秒为单位的timerSpan(在您的情况下为10000)和一个isActive$ observable,它发出true/false 用于恢复/暂停。所以把它放在一起:

    const isActive$ = fromEvent(document, 'click').pipe(scan(acc => !acc, true)); // for example
    
    pausableTimer(10000, isActive$).pipe(
        repeat(),
        switchMap(() => of('polling time'))
    ).subscribe();
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-03-17
      • 2021-02-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多