【发布时间】:2020-06-30 03:07:30
【问题描述】:
我的节点应用程序使用 kafka-node 节点模块。
我有一个包含三个分区的 kafka 主题,如下所示:
Topic: NotifierTemporarye3df:/opPartitionCount: 3in$ kafReplicationFactor: 3ibe Configs: segment.bytes=1073741824 --topic NotifierTemporary
Topic: NotifierTemporary Partition: 0 Leader: 1001 Replicas: 1001,1003,1002 Isr: 1001,1003,1002
Topic: NotifierTemporary Partition: 1 Leader: 1002 Replicas: 1002,1001,1003 Isr: 1002,1001,1003
Topic: NotifierTemporary Partition: 2 Leader: 1003 Replicas: 1003,1002,1001 Isr: 1003,1002,1001
当我向主题写入一系列键控消息时,它们似乎都写入了同一个分区。我希望将一些不同的键控消息发送到分区 1 和 2。
这是我从消费者 onMessage 事件函数中输出的几条消息的日志:
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":66,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":345,"partition":0,"highWaterOffset":346,"key":"66","timestamp":"2020-03-19T00:16:57.783Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":222,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":346,"partition":0,"highWaterOffset":347,"key":"222","timestamp":"2020-03-19T00:16:57.786Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":13,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":347,"partition":0,"highWaterOffset":348,"key":"13","timestamp":"2020-03-19T00:16:57.791Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":316,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":348,"partition":0,"highWaterOffset":349,"key":"316","timestamp":"2020-03-19T00:16:57.798Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":446,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":349,"partition":0,"highWaterOffset":350,"key":"446","timestamp":"2020-03-19T00:16:57.806Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":66,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":350,"partition":0,"highWaterOffset":351,"key":"66","timestamp":"2020-03-19T00:17:27.918Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":222,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":351,"partition":0,"highWaterOffset":352,"key":"222","timestamp":"2020-03-19T00:17:27.920Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":13,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":352,"partition":0,"highWaterOffset":353,"key":"13","timestamp":"2020-03-19T00:17:27.929Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":316,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":353,"partition":0,"highWaterOffset":354,"key":"316","timestamp":"2020-03-19T00:17:27.936Z"}
这是发送消息的kafka-node生产者代码:
* @description Adds a notification message to the Kafka topic that is not saved in a database.
* @param {Int} recipientId - accountId of recipient of notification message
* @param {Object} message - message payload to send to recipient
*/
async sendTemporaryNotification(recipientId, subject, message) {
const notificationMessage = {
recipient: recipientId,
subject,
message,
};
// we need to validate this message schema - this will throw if invalid
Joi.assert(notificationMessage, NotificationMessage);
// partition based on the recipient
const payloads = [
{ topic: KAFKA_TOPIC_TEMPORARY, messages: JSON.stringify(notificationMessage), key: notificationMessage.recipient },
];
if (this.isReady) {
await this.producer.sendAsync(payloads);
}
else {
throw new ProducerNotReadyError('Notifier Producer not ready');
}
}
}
如您所见,它们都不是来自分区 1 和 2。即使在几分钟内不断发送带有随机整数键的消息后也是如此。我可能做错了什么?
【问题讨论】:
-
你能调试生产者库,看看使用了什么分区策略吗?
标签: apache-kafka kafka-consumer-api kafka-producer-api