【问题标题】:Synchronously process Observable to prevent too many HTTP requests from timing out using rxjs Angular and ngrx使用 rxjs Angular 和 ngrx 同步处理 Observable 以防止过多的 HTTP 请求超时
【发布时间】:2018-10-12 21:43:41
【问题描述】:

我有以下代码。它获取一个患者数组并构建一个行对象this.rows,我在这个 Angular 4 组件前端的表格中显示该对象(我也在使用 rxjs 5.5)。

我的问题是每行的 hasAlert 属性是通过调用 hasAlerts() 分配的。在 HasAlerts 方法中,我通过 this.dataService.fetchItems<Observation>( 为每位患者发出一个 http 请求。

当有很多患者时,异步发生的 HTTP 请求过多,它们将从 HasAlerts() 开始失败(超时)。有没有办法限制这个,或者从 hasAlerts() 一次处理一个 Observable?

以下是解决此问题的可能方法

  1. 我认为 concatMap 可能会有所帮助,但我不确定如何使用它。还应该使用它来处理每个患者/行,还是应该尝试将每个可观察到的 hasAlert() 聚合到一个列表中,然后尝试使用 concatMap 一次处理一个?
  2. 一种解决方法是我在注释中添加的 patientsAlertsProcessed 变量包装器 // 用于限制 hasAlertsMethod。我将其限制为仅使用该方法 15 次以避免超时问题。这并不理想,因为一旦这 15 个异步请求完成,我就无法再次启动它。

代码如下

ngOnInit(): void {
    this.store.dispatch(new patients.Load([]));

    this.patients$ = this.store.select(fromPatients.getAll); 
    var patientsAlertsProcessed =0;
    this.patients$.debounceTime(2000).map(p =>{//Emits Observable<Patient[]>s, the debounceTime is to take the last one as it builds up
      this.rows = p.map(pat => {// p = Patient[], iterate through the array of patients
        var observations=0;
        var rowX= {
          username: pat.username,
          id: pat.id,
          hasAlert:false, 
        };

        if (patientsAlertsProcessed<15){// this is done to throttle the HasAlerts Method
          this.hasAlerts(pat).do(x => {
            observations++;
            if (observations>0)
            {
              rowX.hasAlert=true;
            }
          }).subscribe();
          patientsAlertsProcessed++;
        }
        return rowX;
      });
    }).subscribe(
       ()=> { },
       ()=> {
        this.table.recalculatePages();
      }
    );

  }
  hasAlerts(pat: Patient): Observable<Observation> {
    var obs$= this.dataService.fetchItems<Observation>(// this is making an HTTP get request 
        "Observation",
        null,
        pat.id
      ).filter(function (x){
        if (x.category.coding[0].code == "GlucoseEvent"){ 
            return true;
        }
        else{
          return false; 
        }
      }
    );
    return obs$;
  }

【问题讨论】:

    标签: angular rxjs observable rxjs5 ngrx


    【解决方案1】:

    您可能想尝试这些方面的东西。

    const concurrency = 10; 
    const rowsWithAlerts = [];
    
    this.patients$.debounceTime(2000)
    .switchMap(patients => Observable.from(patients)) // patients is of type Patient[]
    .mergeMap(patient => {
       rowsWithAlerts.push(patient);
       this.hasAlerts(patient).do(
       hasAlertsResult => {
         // here you have hold to both the patient and the result of
         // hasAlerts call
         patient.hasAlerts = hasAlertsResult;
       }}), concurrency)
    .subscribe(
       () => this.rows = rowsWithAlerts;
    )
    

    这里的关键是使用mergeMap 运算符并将并发级别设置为一个值,在此示例中为 10,但显然可以是任何值。

    这允许限制您同时订阅的 observable 的数量,这在您的情况下意味着限制您进行的 http 调用的数量。

    【讨论】:

    • 要去试试这个,我注意到你已经掌握了 rxjs。熟悉 rxjs 后,有什么资源对你有帮助吗?
    • 我设置 this.rows 的逻辑我会在 switchMap 行中 => 之后的块中执行此操作吗?
    • 我不建议这样做。您必须考虑到作为 switchMap 参数获得的 patients 数组与使用 rowsWithAlerts 获得的数组不同。它们是数组的不同实例,可能包含不同的 Person 实例。
    • 在您的示例中,我应该在哪里分配用户名 ID 并将警报作为属性到 rowX?同样在 switch 映射行 observable.from(p) 中 p 是未定义的
    • 你说得对,应该不是p,而是switchMap内的患者。我还更新了我的答案,以便您为每位患者设置 hasAlerts 标志并将每位患者推入 rows 数组。
    猜你喜欢
    • 2021-05-26
    • 2018-09-16
    • 1970-01-01
    • 1970-01-01
    • 2018-11-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-01-07
    相关资源
    最近更新 更多