【问题标题】:How to pipe series of sync. and async. tasks using RxJs Operators in an Angular service如何管道系列同步。和异步。在 Angular 服务中使用 RxJs 运算符的任务
【发布时间】:2020-11-22 03:37:18
【问题描述】:

这里是 3 个典型步骤组的场景

  1. 准备工作
  2. 检索数据
  3. 清理工作

我正在寻找处理这些设置在数组中的步骤的最佳安排(见下文)。

条件:

  • 重要提示:前两个步骤组必须可以在任何时间点/任何时间被事件或主题或类似取消 (例如,由用户点击触发)。

  • 如果触发了取消/停止,仍然需要运行第 3 组的步骤来进行清理工作

  • 1.准备工作:

  • 1.1。这组1的动作步骤必须一个一个地执行

  • 1.2。除非取消,否则所有步骤都会执行

  • 1.3。 (可以以任何方式编码 - 作为函数、Observable、Promise 等)

  • 2。检索数据

  • 2.1。准备工作后执行步骤(一)

  • 2.2。此步骤使用 API,其返回数据为:Promise

  • 2.3。结果数据立即返回给调用者/订阅者(不等待清理工作)

  • 2.4。返回的结果必须再次为:Promise

  • 3.清理工作

  • 3.1。在第 2 步之后开始

  • 3.2。可以异步/随时/稍后运行,但在第 1 步和第 2 步之后

  • 3.3。第3组的步骤必须一个一个地执行

  • 3.4。 (可以以任何方式编码 - 作为函数、Observable、Promise 等)

这是我需要修改的框架代码,尤其是方法内的代码:

handleDataRequest()


import { Injectable } from '@nestjs/common';
import { from, Subject } from 'rxjs';
import { filter, map, switchMap, mergeMap,subscribeOn, take, takeUntil, } from 'rxjs/operators';


export interface ActionStep {
   id: string,
   taskGroup: number  // 1 = Prep.Work, 2 = Data, 3 = Cleanup
   // etc.
}


export const ActionSteps: ActionStep[] = [
  { id: 'PrepWork1', taskGroup: 1 },
  { id: 'PrepWork2', taskGroup: 1 },
  { id: 'PrepWork3', taskGroup: 1  },
  { id: 'Data',      taskGroup: 2  },
  { id: 'Cleanup1',  taskGroup: 3   },
  { id: 'Cleanup2',  taskGroup: 3   },
]


@Injectable()
export class DataRequestService<T> {


  public requestCanceled$: Subject<boolean>;


  handleDataRequest(actionSteps: ActionStep[]): Promise<T[]> {

    const actionSteps$ = from(actionSteps);

    actionSteps$.pipe(
      filter(step => step.taskGroup === 1),  // 1 = Prep.Work
      takeUntil(this.requestCanceled$),
      map((step: ActionStep,) => this.prepWork(step)));

    const result = actionSteps$.pipe(
      filter(step => step.taskGroup === 2), // 2 = Data
      takeUntil(this.requestCanceled$),
      map((step: ActionStep) => this.getData(step)));


    actionSteps$.pipe(
      filter(step => step.taskGroup === 3), // 3 = Cleanup
      map((step: ActionStep,) => this.cleanupWork(step)));

    return result;

  }


  private prepWork(param)  {
      // doing prep. work ...
  }

  private getData<T>(param): Promise<T[]> {
    let data: Promise<T[]>;
      // getting data ...
    return data;
  }

  private cleanupWork(param) {
      // doing cleanup ...
  }
  
}

【问题讨论】:

    标签: angular rxjs


    【解决方案1】:

    我认为你不能返回 Promise。我猜你必须返回 Observable>。我写了一些代码,也许这就是你想要的:

    public requestCanceled$: Subject<boolean>;
    
      public stepOneFinished$: Subject<boolean>;
      public stepTwoFinished$: Subject<boolean>;
    
      handleDataRequest(actionSteps: ActionStep[]): Observable<Promise<T[]>> {
        const startSTepTwo$ = merge(
          this.requestCanceled$,
          this.stepOneFinished$
        ).pipe(take(1));
        const startStepThree$ = merge(
          this.requestCanceled$,
          this.stepTwoFinished$
        ).pipe(take(1));
    
    const actionSteps$ = from(actionSteps);
    
    const ObservableOne = actionSteps$.pipe(
      filter((step: ActionStep) => step.taskGroup === 1), // 1 = Prep.Work
      takeUntil(this.requestCanceled$),
      map((step: ActionStep) => {
        this.prepWork(step);
        this.stepOneFinished$.next();
        this.stepOneFinished$.complete();
        return true;
      })
    );
    
    const ObservableTwo = startSTepTwo$.pipe(
      mergeMap(x =>
        actionSteps$.pipe(
          filter((step: ActionStep) => step.taskGroup === 2), // 2 = Data
          takeUntil(this.requestCanceled$),
          map((step: ActionStep) => {
            return this.getData(step);
          }),
          tap(d => {
            this.stepTwoFinished$.next();
            this.stepTwoFinished$.complete();
          })
        )
      )
    );
    
    const ObservableThree = startStepThree$.pipe(
      mergeMap(b =>
        actionSteps$.pipe(
          filter(step => step.taskGroup === 3), // 3 = Cleanup
          map((step: ActionStep) => {
            this.cleanupWork(step);
            return true;
          })
        )
      )
    );
    
    return combineLatest(ObservableOne, ObservableTwo, ObservableThree).pipe(
      map(d => d[1])
    );
    }
    

    【讨论】:

    • toPromise 运算符是否允许Promise&lt;T[]&gt;
    • 我想这会给你承诺>
    • 看起来是一个很好的开始和一些简化的候选者。非常感谢。
    【解决方案2】:

    这里是结合 Observable 和 Promise 工具的解决方案的一个版本,供可能感兴趣的人使用。欢迎提出意见和调整。

    要测试,请将类粘贴到您的 Angular 测试项目中,创建一个实例,例如

    const aDataRequestDemo = new DataRequestDemo();
                         // run like this
    aDataRequestDemo.dataRequestTest();
    

    代码如下:

    import { Subject, from, merge } from 'rxjs';
    import { filter, take, takeUntil, tap, delay } from 'rxjs/operators';
    
    
    export interface ActionStep {
       id: string,
       taskGroup: number  // 1 = Prep.Work, 2 = Data, 3 = Cleanup
       // etc.
    }
    
    const actionSteps: ActionStep[] = [
      { id: 'PrepWork1', taskGroup: 11 },
      { id: 'PrepWork2', taskGroup: 11 },
      { id: 'PrepWork3', taskGroup: 11  },
      { id: 'Data',      taskGroup: 21  },
      { id: 'Cleanup1',  taskGroup: 31   },
      { id: 'Cleanup2',  taskGroup: 31   },
    ]
    
    
    
    export class DataRequestDemo {
    
      requestCanceled$: Subject<any> = new Subject<any>();
      requestCanceled: boolean;
      
                        /**
                         * Test mentod for a data request 
                         * from and async source 
                         * - returning a Promise<T[]> data 
                         */
      async dataRequestTest() {
        const data: string[] = 
          await this.dataRequest();
        console.log('-- RETREIVED DATA: ', JSON.stringify(data));
      }
        
                        /**
                         * Test mentod for a data request 
                         * from and async source 
                         * - returning a Promise<T[]> data 
                         */
      async dataRequest(): Promise<string[]> {
                        // run preparation tasks
        await this.prepTasks();
                        // get data from source/db
        const data = await this.getData('x');
                        // Run cleanup tasks, later, 
                        // 1st retur data to requestor.
                        // NOTE: Cleanup always runs,
                        // even if request was canceled
        this.cleanup(1000);
                        // if request got canceled, 
                        // return undefined
        if (this.requestCanceled) 
          return undefined;
                        // if request NOT canceled, 
                        // return data
        return data;
      }
    
      
                        /**
                         * Performs all preparation tasks
                         * outlined in the actionSteps array
                         * - is cancelable
                         */
      async prepTasks(): Promise<boolean> {
        if (this.requestCanceled)
          return undefined;
        const max = actionSteps.filter(step => step.taskGroup === 11).length;
        from(actionSteps).pipe(
                        // only select tasks of taskGroup = 11 = Prep.Work
          filter(step => step.taskGroup === 11),
                        // will unsubscribe after all tasks done
          take(max),
                        // will unsubscribe if user cancles
          takeUntil(
            merge(this.requestCanceled$)
          ),
          tap((step: ActionStep) => this.prepWork(step)),
        ).subscribe();
        return true
      }
                        /**
                         * Performs all cleanup tasks
                         * outlined in the actionSteps array.
                         * Runs delayed - after prep tasks and
                         * after retreived data sent to requestor; 
                         * - not cancelable, always runs
                         */
      private cleanup(ms: number) {
        const max = actionSteps.filter(step => step.taskGroup === 31).length;
        console.log('--cleanup max: ', max);
        from(actionSteps).pipe(
                        // delay start of the tasks by x millis
                        // (not each task is delayed, only the start)
          delay(ms),
                        // only select tasks of taskGroup = 31 / Cleanup
          filter(step => step.taskGroup === 31),
                        // will unsubscribe after all tasks done
          take(max),
                        // run the taks fn:
          tap((step: ActionStep) => this.cleanupWork(step))
        ).subscribe(v => console.log('--CLEANUP: ', v));
      }
    
      
    
                        // simulates prep. tasks
      private prepWork(param): boolean  {
        console.log('-- prepWork, param: ', param);
        return true;
      }
    
                        // simulates data retreival from source / db / ...
      private async getData(param): Promise<string[]> {
    
        if (this.requestCanceled) 
          return undefined;
    
        console.log('-- getData, param: ', param);
    
        let data: string[] = [
          'test-data-1', 
          'test-data-2'
        ];
        return data;
      }
    
                        // simulates cleanup tasks
      private cleanupWork(param) {
        console.log('-- cleanupWork, param: ', param);
      }
    
                        /**
                         * User can use this fn. to cancel
                         * the dataRequestTest() method.
                         * It is set for cancelation of both steps,
                         * sync and async 
                         */
      cancel() {
        this.requestCanceled = true;
        this.requestCanceled$.next();
        this.requestCanceled$.complete();
      }
    
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2015-12-15
      • 2020-08-30
      • 2020-03-26
      • 1970-01-01
      • 2018-12-28
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多