【发布时间】:2022-01-23 22:29:04
【问题描述】:
我正在使用 NestJS 实现微服务架构,我正在使用使用 NestJS 的 CLIENT 应用程序,
客户端应用接收到一个休息请求并发送到Kafka获取结果
try {
const pattern = 'findDoctors';
const payload = body;
const doctors = await lastValueFrom(
this.client.send(pattern, payload).pipe(timeout(50000)),
);
return doctors;
} catch (e) {
console.log(e);
throw new Error(e);
}
而在微服务端(目前是混合应用,稍后会移除其余的api)
@MessagePattern('findDoctors')
findDoctors(@Payload() message): any {
return this.doctorService.searchForDoctors(message.value);
}
async searchForDoctors(data): Promise<any[]> {
this.logger.info('Started search for doctors job');
throw 'Not Found';
}
在我抛出异常后,我会进入日志
node:17720) UnhandledPromiseRejectionWarning: TypeError [ERR_INVALID_ARG_TYPE]: The first argument must be of type string or an instance of Buffer, ArrayBuffer, or Array or an Array-like Object. Received an instance of Object
at Function.from (buffer.js:330:9)
at ServerKafka.assignErrorHeader (C:\Users\Desktop\Projects\node_modules\@nestjs\microservices\server\server-kafka.js:137:73)
at ServerKafka.sendMessage (C:\Users\Desktop\Projects\node_modules\@nestjs\microservices\server\server-kafka.js:119:14)
at C:\Users\Desktop\Projects\node_modules\@nestjs\microservices\server\server-kafka.js:81:31
at C:\Users\Desktop\node_modules\@nestjs\microservices\server\server.js:46:31
at processTicksAndRejections (internal/process/task_queues.js:79:11)
(node:17720) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). To terminate the node process on unhandled promise rejection, use the CLI flag `--unhandled-rejections=strict` (see https://nodejs.org/api/cli.html#cli_unhandled_rejections_mode). (rejection id: 3)
(node:17720) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
而客户端只是等待超时,永远不会收到微服务的异常响应
尝试使用 RpcException 但它是一样的
【问题讨论】:
-
你希望 Kafka 从 Producer 客户端“返回”什么?
-
“未找到”错误,我尝试使用 throw new RpcException('Invalid credentials.');也是,但它只适用于 return new RpcException 而不是 throw 所以我不能使用过滤器
-
我的意思是卡夫卡生产者不会“搜索”。您将发送一个插入某个主题的请求。在其他地方需要一个 consumer client 来读取该搜索请求事件,并对其进行一些处理......但是,Kafka 并不是请求-响应模型的合适替代品,它会是如果您使用由医生 ID/姓名索引的实际数据库,则首选,例如
-
可以分享
server-kafka.js的全部内容吗?或者至少告诉我们错误中提到的第 137 行是什么? -
这更像是一个 Nestjs 和 typescript 问题,但使用的是 Kafka,所以这不是这里的主要目的,谢谢
标签: node.js typescript nestjs