【问题标题】:rxjs - buffer emitted values until other observable emits, then emit as normalrxjs - 缓冲发出的值,直到其他 observable 发出,然后正常发出
【发布时间】:2020-11-03 04:08:00
【问题描述】:

我正在构建一个记录器对象,它异步获取 IP 地址,然后使用该 IP 地址记录所有值。它必须在实例化时就开始收集日志,但只有在获得 IP 地址后才发出它们;之后它应该会正常发射。

这是我的课:


class LoggerService {
  constructor() {
    let thisIp;
    const getIp = Observable.create(function(observer) {
      // doing it with a timeout to emulate bad network
      setTimeout(() => {
        fetch('https://api.ipify.org?format=json').then(response => response.json()).then(response => {
          thisIp = response.ip;
          console.log('fetched IP: ', thisIp);
          observer.next(response.ip);
          observer.complete();
        });
      }, 5000)
    });
    // this is where I plan to buffer logs until IP is obtained
    this.logStream = new Subject().pipe(buffer(getIp));
    // for starters - just log to the console with the IP address
    this.logStream.subscribe((value) => console.log(thisIp, value));

  }

  emit = (message) => this.logStream.next(message);
}

但它不能按我的需要工作;它确实将所有缓冲值作为数组输出,但在获得 IP 后停止发送它们:


const logger = new LoggerService();


setInterval(() => {
  logger.emit('Hey ' + Math.random())
}, 1000);

// I get five messages and that's it

即使在缓冲之后,如何让它发出我的值?

【问题讨论】:

  • 你看过replaySubject而不是主题吗?听起来您可能正在尝试重新发明一个 replaySubject。

标签: rxjs


【解决方案1】:

更新:

一年半后回头看,我注意到 combineLatest/of 是不必要的;我们可以简单地通过管道将 ip observable 映射到所需的形状。这是我今天的做法:

export class LoggerService {
  private messages$ = new Subject<string>();

  private formattedMessages$ = this.messages$.pipe(
    mergeMap(message => this.service.ipAddress$.pipe(
      map(ip => `[${ip}] ${message}`)
    ))
  );

  constructor(private service: GenericService) { 
    this.formattedMessages$.subscribe(
      message => console.log(message) // actual logging logic goes here...
    );
  }

  public log(message: string) {
    this.messages$.next(message);
  }
}

原答案

您不需要“缓冲”这些值本身,但您可以创建一个依赖于异步 ipAddress$ 的流,因此在发出 ip 地址之前不会发出该值。 combineLatest 可以很好地实现此目的。

让我们为LoggerService 提供一个名为message$ 的消息流和一个简单的log() 方法,该方法将提供的字符串推送到此流中。

我们可以构造一个messagesWithIpAddresses$ 流,它使用combineLatest 创建一个可观察对象,它与ipAddress$ 一起发出提供的消息,但前提是两者都实际发出了一个值。

export class LoggerService {
  private messages$ = new Subject<string>();

  public log(message: string): void {
    this.messages$.next(message);
  }

  constructor(service: GenericService) { 

    const messagesWithIpAddresses$ = this.messages$.pipe(
      mergeMap(message => combineLatest(service.ipAddress$, of(message)))
    );

    messagesWithIpAddresses$.subscribe(
      ([ip, message]) => {
        // actual logging logic would go here...
        console.log(`[${ip}] ${message}`);
      }
    );
  }
}

由于of(message) 将立即发出,我们将等待ipAddress$。但是,如果一个值已经发出,那么它也会立即发出。

看看这个工作StackBlitz

【讨论】:

    【解决方案2】:

    更新:以下答案无效。查看后buffer documentation 您的记录器 Subject() 仍然不会再发出消息,因为它仅在 getIp 触发时触发。在您的代码中,getIp 仅在 http 请求之后触发一次。

    我认为我们需要更多关于您想要实现的目标的详细信息,以便提出正确的 rxjs 管道。


    我会删除这一行

    observer.complete();  // remove it
    

    这标志着流结束的下标,这就是你不再收到任何消息的原因。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-12-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-01-18
      相关资源
      最近更新 更多