【问题标题】:nest bull separate process for queues and api用于队列和 api 的嵌套公牛单独进程
【发布时间】:2020-03-27 03:46:35
【问题描述】:

我有一个 nestjs 应用程序,它暴露了一些 REST APIs。其中一个APIs 触发了处理某些任务的作业。问题是当作业被触发时,应用程序停止服务 REST 请求,这导致负载均衡器的健康检查失败。我按照README末尾给出的方法启动了一个单独的子进程来处理作业。但是,该作业不会在子进程中启动,并且 API 请求会停止。

这是我的工作:

import {
  BullQueueEvents,
  OnQueueActive,
  OnQueueEvent,
  Process,
  Processor,
} from 'nest-bull';
import { Job } from 'bull';
import { Logger } from '@nestjs/common';
import { AService } from './a-service';
import { AJobInterface } from '../AJobInterface';

@Processor({ name: 'a_queue' })
export class AJob {
  private readonly logger = new Logger('AQueue');

  constructor(private readonly service: AService) {}

  @Process({
    name: 'app',
    concurrency: 1
  })
  processApp(job: Job<AJobInterface>) {
    console.log('CHILD: ', process.pid);
    const { jobId } = job.data;
    return this.service.process(jobId);
  }

  @OnQueueActive()
  onActive(job: Job) {
    this.logger.log(
      `Processing job ${job.id} of type ${job.name} with data ${JSON.stringify(
        job.data,
      )}...`,
    );
  }

  @OnQueueEvent(BullQueueEvents.COMPLETED)
  onCompleted(job: Job) {
    this.logger.log(
      `Completed job ${job.id} of type ${job.name} with result ${job.returnvalue}`,
    );
  }
}

这是我的 app.module.ts:

import { Module, OnModuleInit } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { DatabaseModule } from './db/module';
import { BullModule } from 'nest-bull';
import { AJob } from './worker/a-job';
import { AService } from './worker/a-service';
import { join } from 'path';

@Module({
  imports: [
    TypeOrmModule.forRoot(),
    DatabaseModule,
    BullModule.register({
      name: 'a_queue',
      processors: [ join(__dirname, 'worker/a-job.js') ],
      options: {
        redis: {
          host: process.env.REDIS_URL || '127.0.0.1',
          port: 6379,
          showFriendlyErrorStack: true,
        },
        settings: {
          lockDuration: 300000,
          stalledInterval: 300000
        },
      },
    }),
  ],
  controllers: [AppController],
  providers: [AppService, AJob, AService],
})
export class AppModule implements OnModuleInit {
  onModuleInit() {
    console.log('MAIN: ', process.pid);
  }
}

我做错了什么吗?

【问题讨论】:

    标签: node.js typescript nestjs bull.js


    【解决方案1】:

    很抱歉这么晚才发布答案。事实证明,在子进程中设置工作人员是不可能的。我最终得到了一个单独的 worker.module.ts 和一个单独的 worker.ts 并为 API 和工作程序创建了两个单独的进程。

    worker.module.ts:

    import { Module, OnModuleInit } from '@nestjs/common';
    import { TypeOrmModule } from '@nestjs/typeorm';
    import { AppService } from '../app.service';
    import { DatabaseModule } from '../db/module';
    import { BullModule } from 'nest-bull';
    import { AJob } from './a-job';
    import { AService } from './a-service';
    import { join } from 'path';
    import { Job, DoneCallback } from 'bull';
    
    @Module({
      imports: [
        TypeOrmModule.forRoot(),
        DatabaseModule,
        BullModule.register({
          name: 'a_queue',
          processors: [ (job: Job, done: DoneCallback) => { done(null, job.data); } ],
          options: {
            redis: {
              host: process.env.REDIS_URL || '127.0.0.1',
              port: 6379,
              password: process.env.REDIS_PWD,
              showFriendlyErrorStack: true,
            },
            settings: {
              lockDuration: 300000,
              stalledInterval: 300000
            },
          },
        }),
      ],
      providers: [AppService, AJob, AService],
    })
    export class WorkerModule implements OnModuleInit {
      onModuleInit() {
        console.log('WORKER: ', process.pid);
      }
    }
    

    worker.ts:

    import { NestFactory } from '@nestjs/core';
    import { WorkerModule } from './worker/worker.module';
    
    async function bootstrap() {
      const app = await NestFactory.create(WorkerModule);
      app.init();
    }
    
    bootstrap();
    

    虽然app.module.ts 现在看起来像这样:

    //...imports
    @Module({
      imports: [
        TypeOrmModule.forRoot(),
        DatabaseModule,
        BullModule.register({
          name: 'a_queue',
          processors: [ ],
          options: {
            redis: {
              host: process.env.REDIS_URL || '127.0.0.1',
              port: 6379,
              showFriendlyErrorStack: true,
            },
          },
        }),
      ],
      controllers: [AppController],
      providers: [AppService],
    })
    export class AppModule implements OnModuleInit {
      onModuleInit() {
        console.log('MAIN: ', process.pid);
      }
    }
    

    以及对应的app.ts

    import { NestFactory } from '@nestjs/core';
    import { AppModule } from './app.module';
    import { port } from './config';
    
    async function bootstrap() {
      const app = await NestFactory.create(AppModule);
      app.enableCors();
      await app.listen(port);
    }
    
    bootstrap();
    

    【讨论】:

    • 愚蠢的问题,但是:你如何从下一个 cli 开始 worker.tsapp.ts?我的意思是,如果我执行npm start,则只执行 main.ts。我找不到如何将入口点传递给嵌套 cli。像npm start -entry-point worker.ts 之类的东西,或者您是否移至nest-cli 中的“monorepo”配置。谢谢
    • 嗨@JesusMonzonLegido,您可以使用 pm2 或 supervisor 之类的东西来启动下一个流程
    • @ujwaldhakal 谢谢。是的,我已经将它与 pm2 集成。对于开发,我有 2 个 nest-cli.json 具有不同 "entryFile" 的文件。我以NODE_ENV=development nest start --watch --config nest-cli-worker.json之类的开头
    猜你喜欢
    • 2019-12-19
    • 1970-01-01
    • 2019-10-08
    • 2023-03-12
    • 2019-09-06
    • 1970-01-01
    • 2021-06-09
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多