【问题标题】:How to pause and buffer Observables in RxJS v5如何在 RxJS v5 中暂停和缓冲 Observables
【发布时间】:2016-01-29 16:26:10
【问题描述】:

我正在尝试对 HTTP 请求实施背压策略,以在特定条件下暂时阻止待处理的请求数秒。暂停的逻辑将基于另一个 Observable。

我的研究和理解使我相信pausableBuffered 运算符完全符合我的需要。记录在这里http://reactivex.io/documentation/operators/backpressure.html

但是我在 ReactiveX v5 (5.0.0-beta.0) 中找不到这个运算符,迁移指南 (v4 - v5) 似乎表明它们已被删除。如果是这种情况,我如何使用 v5 可用的运算符来达到预期的效果?

【问题讨论】:

    标签: reactive-programming rxjs reactive-streams


    【解决方案1】:

    目前的背压故事完全是dropped

    这是获得相同结果的一种方法:

    const pausableBuffered = (observable, pauser) => {
        const subj = new rx.Subject();
    
        let buffer = [];
        const nextEmitter = x => subj.next(x);
        const nextBuffer = x => buffer.push(x);
    
        let subscriber = nextEmitter;
        observable.subscribe(x => subscriber(x));
    
        pauser.subscribe(value => {
            if (value) {
                subscriber = nextBuffer;
            } else {
                buffer.forEach(nextEmitter);
                buffer = [];
                subscriber = nextEmitter;
            }
        })
    
        return subj;
    };
    

    【讨论】:

    • observablepauser 正在泄露订阅。
    【解决方案2】:

    我偶然发现了这个答案,对于我的用例,我把它变成了一个管道

    import { Observable, Subject, Subscription } from "rxjs";
    
    export function pausable(pauseToken: Observable<boolean>, startPuased: boolean, lastOnly: boolean) {
        return function <T>(source: Subject<T>): Observable<T> {
            let buffer: T[] = [];
            const nextEmitter = (x: T) => subj.next(x);
            const nextBuffer = (x: any) => buffer.push(x);
    
            var sourceSubscription: Subscription;
            var pauseSubscription: Subscription;
    
            var subj = new Subject<T>();
    
            let subscriber = nextEmitter;
            if (startPuased) {
                subscriber = nextBuffer;
            }
            sourceSubscription = source.subscribe({
                next(value) {
                    subscriber(value);
                },
                error(error) {
                    subj.error(error);
                },
                complete() {
                    subj.complete();
                    pauseSubscription?.unsubscribe();
                }
            })
    
            pauseSubscription = pauseToken.subscribe({
                next(value) {
                    if (value) {
                        subscriber = nextBuffer;
                    } else {
                        if (lastOnly && buffer.length > 0) {
                            nextEmitter(buffer.pop())
                        } else {
                            buffer.forEach(nextEmitter);
                        }
                        buffer = [];
                        subscriber = nextEmitter;
                    }
                },
                complete() {
                    sourceSubscription?.unsubscribe();
                    pauseSubscription?.unsubscribe();
                }
            });
    
            return subj;
        }
    }

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-07-16
      • 2015-12-04
      • 2015-11-19
      • 1970-01-01
      • 1970-01-01
      • 2012-06-30
      • 2019-11-23
      • 2012-05-25
      相关资源
      最近更新 更多