【问题标题】:RxJS get completed result on subscribe callbackRxJS 在订阅回调中获得完成的结果
【发布时间】:2021-05-16 04:07:18
【问题描述】:

我是 RxJS 初学者,我想让我的代码更简洁。

这是我写的代码,我在接收完成结果时遇到问题。

我想让我的代码更简洁,在订阅回调中获取结果,而不使用其他 Subject 来接收结果。

这是我的代码,rxjs 7.0.1:

import colors from 'colors/safe';
import { Subject } from 'rxjs';
import { throttle } from 'rxjs/operators';

interface IEventTask {
    id: string,
    createdTime: number
}

let global_counter: number = 0;

const mockHTTPRequest = async (event: IEventTask) => {
    return (Promise.resolve().then(async () => {
        await new Promise((resolve) => {
            global.setTimeout(resolve, 1000);
        });

        if (event.id === '01') {
            throw new Error(`error: ${event.id}`);
        }

        const result = `result: ${event.id}`;
        // I don't know how to get result, so I publish to another Subscriber
        subscriber.next(result);
        return result;
    }))
    .catch((error) => {
        console.error(error);
    })
}

const subject = new Subject<IEventTask>();
subject
    .pipe(
        throttle(mockHTTPRequest, {
            leading: true, trailing: true
        })
    )
    .subscribe({
        next: (value) => {
            console.log(`${colors.blue(`starting`)} Task#${value.id} at: ${global_counter++}`);
        },
        error: (error) => {
            console.error(error);
        },
        complete: () => {
            // How can I get Promise resolved result here?
            console.log(`completed`);
        }
    });

// It's not the code I want to use, but I don't know how to make it easy.
const subscriber = new Subject<string>();
subscriber.subscribe((result: string) => {
    console.log(`${colors.green(`finished`)} ${result} at: ${global_counter++}`);
});

(async () => {
    for (let i = 1; i <= 10; i++) {
        await new Promise((resolve) => {
            global.setTimeout(() => {
                resolve(true);
            }, 125);
        });

        const value: IEventTask = {
            id: (i).toString().padStart(2, '0'),
            createdTime: global_counter++
        };

        subject.next(value);
    }
})();

非常感谢。

【问题讨论】:

    标签: javascript typescript rxjs


    【解决方案1】:

    在我看来,每当subject(变量)获得新值时,您都想发出 HTTP 请求。

    所以,您使用throttle 的想法是,它应该等待HTTP 请求直到它完成并且它应该提供返回值。问题在于,throttle 不提供后者。 (见documentation

    我建议你改用switchMap。它期望传递的函数返回一个 observable,当它发出时,switchMap 将转发该值。此外,每当发出新值时,它都会完成内部可观察对象。这意味着如果 HTTP 请求尚未完成,它将终止当前请求并发出新请求。 (见documentation

    import { Subject, Observable, OperatorFunction, pipe, UnaryFunction } from 'rxjs';
    import { filter, switchMap } from 'rxjs/operators';
    
    const mockHTTPRequest = async (event: IEventTask) => {
      return (Promise.resolve().then(async () => {
        await new Promise((resolve) => {
          setTimeout(resolve, 2000);
        });
    
        if (event.id === '01') {
          throw new Error(`error: ${event.id}`);
        }
    
        const result = `result: ${event.id}`;
        // This was needed in order to provide the id in the subscribe section
        return {
          id: event.id,
          result
        };
      }))
        .catch((error) => {
          console.error(error);
          // This was needed for the filter function to work
          return undefined;
        })
    }
    
    // this filters nullish values: null and undefined
    function filterNullish<T>(): UnaryFunction<Observable<T | null | undefined>, Observable<T>> {
      return pipe(
        filter(x => x != null) as OperatorFunction<T | null | undefined, T>
      );
    }
    
    const subject = new Subject<IEventTask>();
    subject
      .pipe(
        switchMap(mockHTTPRequest),
        filterNullish()
      )
      .subscribe({
        next: (value) => {
          console.log(`starting Task#${value.id} at: ${global_counter++}`);
        },
        error: console.error,
        complete: () => {
          // How can I get Promise resolved result here?
          console.log(`completed`);
        }
      });
    

    如果您不想取消请求,而是想要获取传递给subject 的每个值的结果,那么您应该使用查看此评论:https://stackoverflow.com/a/50809667/12851879

    更新:

    这是带有mergeMap 的完整代码(因此无论持续时间如何,都要考虑所有结果)

    import colors from 'colors/safe';
    import { Subject, Observable, OperatorFunction, pipe, UnaryFunction } from 'rxjs';
    import { filter, mergeMap, tap } from 'rxjs/operators';
    
    interface IEventTask {
      id: string,
      createdTime: number
    }
    
    let global_counter: number = 0;
    
    function waitFor(time: number) {
      return new Promise((resolve) => {
        setTimeout(resolve, time);
      })
    }
    
    function filterNullish<T>(): UnaryFunction<Observable<T | null | undefined>, Observable<T>> {
      return pipe(
        filter(x => x != null) as OperatorFunction<T | null | undefined, T>
      );
    }
    
    const mockHTTPRequest = async (event: IEventTask) => {
      return (Promise.resolve().then(async () => {
        await waitFor(2000);
    
        if (event.id === '01') {
          throw new Error(`error: ${event.id}`);
        }
    
        return `result: ${event.id}`;
      }))
        .catch((error) => {
          console.error(error);
          return undefined;
        })
    }
    
    const subject = new Subject<IEventTask>();
    subject
      .pipe(
        tap(value => console.log(`${colors.blue(`starting`)} Task#${value.id} at: ${global_counter++}`)),
        mergeMap(mockHTTPRequest),
        filterNullish()
      )
      .subscribe({
        next: (result) => {
          console.log(`${colors.green(`finished`)} ${result} at: ${global_counter++}`);;
        },
        error: console.error
      });
    
    
    (async () => {
      for (let i = 1; i <= 10; i++) {
        await waitFor(125);
    
        const value: IEventTask = {
          id: (i).toString().padStart(2, '0'),
          createdTime: global_counter++
        };
    
        subject.next(value);
      }
    })();
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-07-13
      • 1970-01-01
      • 2021-11-16
      • 2021-06-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-05-10
      相关资源
      最近更新 更多