【问题标题】:RxJS: An Observable that takes its own old value as inputRxJS:一个将自己的旧值作为输入的 Observable
【发布时间】:2019-09-21 22:06:00
【问题描述】:

我正在尝试实现这样的东西:

// This obviously doesn't work, because we try to refer to `a` before it exists.
// c and m are some Observables.
const aSampled = a.pipe(
  rxjs.operators.sample(c),
  rxjs.operators.startWith(aInitial)
);
const a = m.pipe(
  rxjs.operators.withLatestFrom(aSampled),
  map(([mValue, oldAValue]) => {
    // Do something.
  }),
  rxjs.operators.startWith(aInitial)
);

现在这显然是不可编译的废话,因为我们在创建它之前尝试samplea,但希望它使我的意图明确:a 发出的每个值都应该依赖于 @ 之前发出的旧值之一987654325@。 a 的旧值取决于 c 上次发出某些东西的时间。这有点像在a 上调用pairwise,除了我不想要最后两个值,而是最新的和更远的另一个值。

请注意,如果不是startWith(aInitial) 位,这甚至不是一个定义明确的问题,因为a 发出的第一个值将被循环定义,引用它自己。但是,只要单独指定第一个值a,构造就具有数学意义。我只是不知道如何以干净的方式在代码中实现它。我的预感是,通过编写某种自定义主题会是一种冗长的方式,但更优雅的东西会非常受欢迎。

为了更具体一点,在我的用例中,我处理的是单击并拖动到平移类型的 UI 元素。 mmousemove 事件的 Observable,cmousedown 事件的 Observable。然后,a 将根据光标所在的位置以及单击发生时a 的值不断变化。

【问题讨论】:

    标签: javascript rxjs reactivex


    【解决方案1】:

    如果我理解正确的话,基本的事件流是mousemovemousedown

    基于这些事件流,您必须计算一个新的流a,它以与mousemove 相同的频率发射,但其数据是基于鼠标当前位置和值 a 在最后一个 mousedown 发出时具有。

    所以,如果这是真的,我们可以使用以下 Observables 模拟 mousemovemousedown

    // the mouse is clicked every 1 second
    const c = interval(1000).pipe(
        tap(cVal => console.log('click', cVal))
    );
    // the mouse moves diagonally and emits every 200 ms
    const m = interval(200).pipe(
        map(mVal => [mVal, mVal]),
    );
    

    mousedown 发出时,我们需要以某种方式获得可观察的a 的值。我们怎样才能得到这个?

    假设我们有一个名为value_of_aBehaviourSubject,其初始值为1,并保存了a 的值。如果我们有这样的 Observable,当mousedown 像这样发出时,我们可以得到它的值

    const last_relevant_a = c.pipe(       // when mousedown emits
        switchMap(() => value_of_a.pipe(  // the control is switched to the Observable value_of_a
            take(1),                      // and we consider only its first value
        )),
    );
    

    使用m、mousemove Observable 和last_relevant_a,我们拥有了我们需要的所有 Observable。事实上,我们只需将它们的最新排放量结合起来,就可以获得计算a 的新值所需的所有元素。

    const a = combineLatest(m, last_relevant_a)
    .submit(
       ([mouseVal, old_a_val] => {
          // do something to calculate the new value of a
       }
    );
    

    现在唯一要做的就是确保value_of_a 发出a 发出的任何值。这可以通过在a 本身的订阅中调用value_of_a 上的next 来完成。

    将它们拼接在一起,解决方案可能类似于

    const c = interval(1000).pipe(
        tap(cVal => console.log('click', cVal))
    );
    
    const m = interval(200).pipe(
        map(mVal => [mVal, mVal]),
    );
    
    const value_of_a = new BehaviorSubject<number>(1);
    
    const last_relevant_a = c.pipe(
        switchMap(cVal => value_of_a.pipe(
            take(1),
        )),
    );
    
    const a = combineLatest(m, last_relevant_a);
    
    a.pipe(
        take(20)
    )
    .subscribe(
        val => {
            // new value of a calculated with an arbitrary logic
            const new_value_of_a = val[0][0] * val[0][1] * val[1];
            // the new value of a is emitted by value_of_a
            value_of_a.next(new_value_of_a);
        }
    )
    

    这可能也是expand 运算符的一个用例,但应该对其进行调查。

    【讨论】:

    • 这与我自己发布的解决方案非常相似,但解释得更好。已批准!
    【解决方案2】:

    您可以根据行为主题创建一个名为先前主题的新主题。

    行为主体的来源在这里https://github.com/ReactiveX/rxjs/blob/master/src/internal/BehaviorSubject.ts

    因此,让我们创建一个先前的主题,它会发出当前值以及先前的值。

    import { Subject } from 'rxjs';
    import { Subscriber } from 'rxjs';
    import { Subscription } from 'rxjs';
    import { SubscriptionLike } from 'rxjs';
    import { ObjectUnsubscribedError } from 'rxjs';
    
    export class PreviousSubject<T> extends Subject<T[]> {
    
      _value: T[];
    
      constructor(value: T, previous?: T) {
        super();
        this._value = [value, previous];
      }
    
      get value(): T[] {
        return this.getValue();
      }
    
      /** @deprecated This is an internal implementation detail, do not use. */
      _subscribe(subscriber: Subscriber<T[]>): Subscription {
        const subscription = super._subscribe(subscriber);
        if (subscription && !(<SubscriptionLike>subscription).closed) {
          subscriber.next(this._value);
        }
        return subscription;
      }
    
      getValue(): T[] {
        if (this.hasError) {
          throw this.thrownError;
        } else if (this.closed) {
          throw new ObjectUnsubscribedError();
        } else {
          return this._value;
        }
      }
    
      next(value: T): void {
        this._value = [value, this._value[0]];
        super.next(this._value);
      }
    }
    

    https://stackblitz.com/edit/typescript-wiayqx查看演示

    如果你不想要 TypeScript,也可以使用 vanilla JS

    const { Subject } = rxjs;
    
    class PreviousSubject extends Subject {
    
      _value;
    
      constructor(value, previous) {
        super();
        this._value = [value, previous];
      }
    
      get value() {
        return this.getValue();
      }
    
      _subscribe(subscriber) {
        const subscription = super._subscribe(subscriber);
        if (subscription && !(subscription).closed) {
          subscriber.next(this._value);
        }
        return subscription;
      }
    
      getValue() {
        if (this.hasError) {
          throw this.thrownError;
        } else if (this.closed) {
          throw new ObjectUnsubscribedError();
        } else {
          return this._value;
        }
      }
    
      next(value) {
        this._value = [value, this._value[0]];
        super.next(this._value);
      }
    }
    
    let ps$ = new PreviousSubject('Start value');
    
    ps$.subscribe(([current, previous]) => {
      console.log(`Current: ${current}, Previous: ${previous}`);
    });
    
    ps$.next('Value');
    ps$.next('Another value');
    ps$.next('Another value again');
    &lt;script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"&gt;&lt;/script&gt;

    【讨论】:

      【解决方案3】:

      经过一番思考,这是我自己的解决方案,以及一个最小的工作示例:

      // Setup for MWE.
      // Note that c just emits once a second, the actual values of it don't matter.
      const c = rxjs.interval(1000).pipe(rxjs.operators.take(2));
      const m = rxjs.interval(300).pipe(rxjs.operators.take(9));
      const aInitial = -1;
      
      // Initialize a.
      const a = new rxjs.BehaviorSubject();
      a.next(aInitial);
      
      // For testing, log the values a emits.
      a.subscribe((val) => {
        console.log(`a: ${val}`);
      });
      
      // Sample a at each emission of c.
      const aSampled = a.pipe(
        rxjs.operators.sample(c),
        rxjs.operators.startWith(aInitial),
      );
      
      // Every time m emits, get also the latest sampled value of a, and do something
      // with the two.
      m.pipe(
        rxjs.operators.withLatestFrom(aSampled),
        rxjs.operators.map(([mValue, oldAValue]) => {
          // In place of actually computing something useful, just log the arguments
          // and for the sake of demonstration return their product.
          console.log(`m: ${mValue}, old a at last c-emission: ${oldAValue}`);
          return mValue*oldAValue;
        }),
      ).subscribe(a);  // Route the output back into a.
      

      输出

      a: -1
      m: 0, old a at last c-emission: -1
      a: 0
      m: 1, old a at last c-emission: -1
      a: -1
      m: 2, old a at last c-emission: -1
      a: -2
      m: 3, old a at last c-emission: -2
      a: -6
      m: 4, old a at last c-emission: -2
      a: -8
      m: 5, old a at last c-emission: -2
      a: -10
      m: 6, old a at last c-emission: -10
      a: -60
      m: 7, old a at last c-emission: -10
      a: -70
      m: 8, old a at last c-emission: -10
      a: -80
      

      诀窍在于,将 a 设置为 Subject 允许我们首先发出其初始值,然后在将其连接到管道之后,该管道将 a 的较旧的采样值作为其输入之一。 Plain Observables 不允许这样做(AFAIK),因为您必须在创建时定义它们的所有输入。不过不需要自定义的 Subject 类。

      【讨论】:

        猜你喜欢
        • 2020-11-17
        • 1970-01-01
        • 1970-01-01
        • 2021-12-20
        • 1970-01-01
        • 2017-11-19
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多