【问题标题】:RxJS Subject closing subscriptions automatically when finishedRxJS Subject 完成后自动关闭订阅
【发布时间】:2020-10-30 00:45:00
【问题描述】:

我在服务中公开了两个主题,因此我可以呈现主题中的值并将它们呈现在我的页面上。

其中一个主题正确接收了信息,但第二个主题自动关闭订阅,我无法再检索数据。

这是我的代码:

// service.ts
  clientSubject: Subject<any>;
  jobSubject: Subject<any>;

  constructor(private clientService: ClientService, private jobService: JobService) {
    this.clientSubject = new Subject();
    this.jobSubject = new Subject();
  }

  fetchData(): void {
    this.clientService.getClients()
    .pipe(
      map(clients => clients.map(
        client => this.jobService.getJobs(client.clientId)
          .pipe(
            map(jobs => jobs.map(job => ({client, job})))
          )
          .subscribe(this.jobSubject)
        )
    ))
    .subscribe(this.clientSubject);
  }

这是我的组件中的代码:

  clients: any;
  jobs: any;

  constructor(private invoiceService: InvoiceService) {}

  ngOnInit(): void {
    this.invoiceService.clientSubject.subscribe(d => {
      this.clients = d;
      console.log(this.clients);
    });

    this.invoiceService.jobSubject.subscribe(d => {
      this.jobs = d;
      console.log(this.jobs);
    });

    this.invoiceService.fetchData();
  }

这是控制台记录的内容:

### this.jobs data ###
invoices.component.ts:25 
[{…}]
0: {client: {…}, job: {…}}
length: 1
__proto__: Array(0)


### this.clients data ###
invoices.component.ts:19 
(2) [SubjectSubscriber, SubjectSubscriber]
0: SubjectSubscriber {closed: true, _parentOrParents: null, _subscriptions: null, syncErrorValue: null, syncErrorThrown: false, …}
1: SubjectSubscriber {closed: true, _parentOrParents: null, _subscriptions: null, syncErrorValue: null, syncErrorThrown: false, …}
length: 2
__proto__: Array(0)

为什么主题订阅者会根据fetchData 中的实现覆盖this.clients 数据?

jobService.getJobs(client.clientId) 返回一个 observable,格式如下:

[{jobId: 1, name: 'blabla', items: []}]

jobService.getClients() 返回一个 observable,格式如下:

[{clientId: 1, name: 'blabla', isParent: false}]

【问题讨论】:

    标签: angular rxjs


    【解决方案1】:

    现在。 Map 正在将一组客户端转换为一组订阅。这就是clients.map([...]。是在做。也许让您感到困惑的是.subscribe 返回一个值?像这样:

    const valueReturnedBySubscribe = anyObservable.subscribe(LAMBDA);
    console.log(valueReturnedBySubscribe);
    

    输出:

    SubjectSubscriber {closed: true, [...]
    

    试试这个:

    fetchData(): void {
      this.clientService.getClients().pipe(
        tap(clients => clients.forEach(
          client => this.jobService.getJobs(client.clientId)
            .pipe(
              map(jobs => jobs.map(job => ({client, job})))
            )
            .subscribe(this.jobSubject)
          )
        )
      )
      .subscribe(this.clientSubject);
    }
    

    tap 根本不会更改您的流,而且您似乎也不想要订阅对象。不知道你为什么要使用地图,但看起来你不需要它。


    更新

    让 jobSubject 订阅 clientSubject 可能是最有意义的,因为新客户总是意味着对新工作的查询。这可能看起来像:

    clientSubject: Subject<any>;
    jobSubject: Subject<any>;
    
    constructor(private clientService: ClientService, private jobService: JobService) {
    
      this.clientSubject = new Subject();
      this.jobSubject = new Subject();
    
      this.clientSubject.pipe(
        map(clients => clients.map(
          client => jobService.getJobs(client.clientId).pipe(
            map(jobs => jobs.map(job => ({client, job})))
          )
        )),
        mergeMap(jobs => merge(jobs))
      ).subscribe(this.jobSubject);
    }
    
    fetchData(): void {
      this.clientService.getClients().subscribe(this.clientSubject);
    }
    

    旁白:

    现在,您的主题(clientSubject 和 jobSubject)是短暂的。当他们订阅的流之一完成(或错误)时,他们完成。如果你想避免这种情况,你可以只订阅值而不是源流的complete()error() 消息

    你需要做的就是改变:

    this.clientSubject/*...code...*/subscribe(this.jobSubject);
    to
    this.clientSubject/*...code...*/subscribe(this.jobSubject.next);
    

    this.clientService.getClients().subscribe(this.clientSubject);
    to
    this.clientService.getClients().subscribe(this.clientSubject.next);
    

    【讨论】:

      【解决方案2】:

      这里有多个问题

      1. 您从 map(clients... 返回订阅,该订阅随后被推送到 this.clientSubject

      2. getClients() 函数返回的数组触发了多个订阅。相反,您可以使用forkJoin 函数来并行触发多个可观察对象。

      3. 您可以switchMap 链接可观察对象而不是嵌套订阅。

      试试下面的

      fetchData(): void {
        this.clientService.getClients().pipe(
          tap(clients => this.clientSubject.next(clients)), // <-- emit to `clientSubject` using `tap` operator
          switchMap(clients => forkJoin(clients.map(client => // <-- use `forkJoin` to trigger multiple observables in parallel
            this.jobService.getJobs(client.clientId).pipe(
              map(jobs => jobs.map(job => ({client, job})))
            )
          )))
        ).subscribe(
          jobs => this.jobSubject.next(jobs)
        );
      }
      

      一个不同之处在于,jobSubject 现在将发出 [{client, job}, {client, job}, ...] 形式的作业数组,而不是单独发出每个作业。

      更新:在每个源通知上发出

      您可以使用tap 运算符的nexterrorcomplete 回调向各自的主题发出每个相应的通知。

      fetchData(): void {
        this.clientService.getClients().pipe(
          tap({ // <-- emit to `clientSubject` using `tap` operator
            next: clients => this.clientSubject.next(clients),
            error: error => this.clientSubject.next(error),
            complete: () => this.clientSubject.complete()
          }),
          switchMap(clients => forkJoin(clients.map(client => // <-- use `forkJoin` to trigger multiple observables in parallel
            this.jobService.getJobs(client.clientId).pipe(
              map(jobs => jobs.map(job => ({client, job}))),
              tap({
                next: jobs => this.jobSubject.next(jobs),
                error: error => this.jobSubject.error(error),
                complete: () => this.jobSubject.complete()  // <-- BAD IDEA! SEE BELOW
              })
            )
          )))
        ).subscribe();
      }
      

      但是,向 Subject 发出 complete 基本上会关闭它。你不能再向它发出,未来的通知将被忽略。因此,除非您知道自己在做什么,否则我不会在 tap 中建议 this.jobSubject.complete()

      【讨论】:

      • observable.subscribe(subject) 不同,这不会将error()complete() 排放传递给他的对象。
      • @MrkSef:我已经更新了答案,使用tap 为相应类型的通知发出两个源 observables。
      • 是的,“我应该准确地回答发帖人的问题,还是应该回答我认为他们想问的问题”,这总是一种很好的舞蹈。 - 我怀疑你是对的,他不希望他的对象在源 observable 完成的那一刻完成。
      猜你喜欢
      • 1970-01-01
      • 2021-12-27
      • 1970-01-01
      • 2021-11-16
      • 2020-07-13
      • 2019-12-29
      • 1970-01-01
      • 2020-02-14
      • 2019-05-10
      相关资源
      最近更新 更多