【问题标题】:NestJS/RxJS - Is there an easier way to "observe" one time?NestJS/RxJS - 有没有一种更简单的方法来“观察”一次?
【发布时间】:2021-07-24 15:35:57
【问题描述】:

我是 NestJS 和 RxJS 的新手。我非常努力地用惯用的 RxJS 编写东西,因为这个项目的目的之一是更好地学习它们,而不是试图绕过或破解它们的 API。

无论如何,所以我有一个逻辑,我正在从 OAuth2 服务器查找 JWKSet。我为此使用 NestJS HttpService,它返回一个 Observable。然后我使用该 observable 将结果设置为 ReplaySubject。然后,在我的 JwtStrategy 中,我使用 secretOrKeyProvider 函数订阅 ReplaySubject 以便在每次 HTTP 请求进入时获取值。

现在,我确信这种方法有很多问题,因为我几乎不了解如何使用 RxJS。我最大的问题是最后一部分。当我在 secretOrKeyProvider 函数中订阅 ReplaySubject 时,我立即取消订阅。这是因为我想清理剩余的订阅。

总的来说,这对我来说感觉很不对,订阅并立即退订。我觉得我做错了什么。我正在寻求对此代码的审查,以了解我可以做出哪些改进。

虽然下面的所有代码都有效,但我的目标是引导我更好、更正确地使用 RxJS。

@Injectable()
export class JwkService implements OnModuleInit, OnModuleDestroy {
    private readonly logger = new Logger(JwkService.name);

    readonly key = new ReplaySubject<string>();
    private subscription: Subscription;

    constructor(
        private httpService: HttpService,
        private configService: ConfigService
    ) {}

    onModuleInit(): void {
        this.subscription = this.httpService
            .get(`${this.configService.get<string>(AUTH_SERVER_HOST)}${jwkUri}`)
            .pipe(
                map((res: AxiosResponse<JwkSet>) => jwkToPem(res.data.keys[0]))
            )
            .subscribe({
                next: (key) => this.key.next(key),
                error: (error: Error) => {
                    this.logger.error(
                        'CRITICAL ERROR: Unable to load JWKSet',
                        ajaxErrorHandler(error)
                    );
                }
            });
    }

    onModuleDestroy(): void {
        if (this.subscription) {
            this.subscription.unsubscribe();
        }
    }
}

@Injectable()
export class JwtStrategy extends PassportStrategy(Strategy) {
    private readonly logger = new Logger(JwtStrategy.name);

    constructor(
        private readonly jwkService: JwkService
    ) {
        super({
            jwtFromRequest: ExtractJwt.fromAuthHeaderAsBearerToken(),
            ignoreExpiration: false,
            secretOrKeyProvider: (
                req: Request,
                rawJwt: string,
                done: doneFn
            ) => {
                jwkService.key
                    .subscribe({
                        next: (value: string) => done(null, value),
                        error: (error: Error) => {
                            this.logger.error(
                                'Error getting JWK key',
                                ajaxErrorHandler(error)
                            );
                            done(error);
                        }
                    })
                    .unsubscribe();
            }
        });
    }
}

【问题讨论】:

    标签: rxjs nestjs


    【解决方案1】:

    如果您不需要unsubscribe,RxJS 会为您提供一些管道:

         .get(`${this.configService.get<string>(AUTH_SERVER_HOST)}${jwkUri}`)
            .pipe(
                take(1),
                map((res: AxiosResponse<JwkSet>) => jwkToPem(res.data.keys[0]))
            )
         ...
    

         .get(`${this.configService.get<string>(AUTH_SERVER_HOST)}${jwkUri}`)
            .pipe(
                first(),
                map((res: AxiosResponse<JwkSet>) => jwkToPem(res.data.keys[0]))
            )
         ...
    

    first(), take(1) 如果取第一个值,管道会为您取消订阅。如果在任何情况下都没有价值,那么您可以手动取消订阅,或者只是完成主题。

    【讨论】:

    • 这很好。还有一个问题,错误处理呢? Pipe 似乎无法做到这一点,除非我遗漏了什么。
    • 你可以使用 catchError 管道,然后 throwError(e => new Error(e)) 作为一个简单的解决方案。并在订阅中解析它: subscribe({ next: () => { .. 通常 subscribe fn }, error: () => { .. execute on throwError }, complete: () => { do something finally })
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-06-01
    • 2012-06-15
    相关资源
    最近更新 更多