【问题标题】:Creating partition for topic in kafka-node在 kafka-node 中为主题创建分区
【发布时间】:2020-01-06 22:06:46
【问题描述】:

我创建了一个 HighLevelProducer 来将消息发布到主题流,该主题流将由 ConsumerGroupStream 使用 kafka-node 消费。当我从同一个 ConsumerGroup 创建多个消费者以从同一个主题消费时,只创建一个分区并且只有一个消费者正在消费。我还尝试定义该主题的分区数,尽管我不确定是否需要在创建主题时定义它,如果需要,我需要提前多少个分区。此外,是否可以将对象推送到 Transform 流而不是字符串(我目前使用 JSON.stringify 因为否则我在消费者中得到 [Object object]。

const myProducerStream = ({ kafkaHost, highWaterMark, topic }) => {
    const kafkaClient = new KafkaClient({ kafkaHost });
    const producer = new HighLevelProducer(kafkaClient);
    const options = {
        highWaterMark,
        kafkaClient,
        producer
    }; 

    kafkaClient.refreshMetadata([topic], err => {
        if (err) throw err; 
    }); 

    return new ProducerStream(options);
};

const transfrom = topic => new Transform({
    objectMode: true,
    decodeStrings: true,
    transform(obj, encoding, cb) {
        console.log(`pushing message ${JSON.stringify(obj)} to topic "${topic}"`);

        cb(null, {
            topic,
            messages: JSON.stringify(obj)
        });
    }
});

const publisher = (topic, kafkaHost, highWaterMark) => {
    const myTransfrom = transfrom(topic);
    const producer = myProducerStream({ kafkaHost, highWaterMark, topic });

    myTransfrom.pipe(producer);

    return myTransform;
};

消费者:


const createConsumerStream = (sourceTopic, kafkaHost, groupId) => {
    const consumerOptions = {
        kafkaHost,
        groupId,
        protocol: ['roundrobin'],
        encoding: 'utf8',
        id: uuidv4(),
        fromOffset: 'latest',
        outOfRangeOffset: 'earliest',
    };

    const consumerGroupStream = new ConsumerGroupStream(consumerOptions, sourceTopic);

    consumerGroupStream.on('connect', () => {
        console.log(`Consumer id: "${consumerOptions.id}" is connected!`);
    });

    consumerGroupStream.on('error', (err) => {
        console.error(`Consumer id: "${consumerOptions.id}" encountered an error: ${err}`);
    });

    return consumerGroupStream; 
};

const publisher = (func, destTopic, consumerGroupStream, kafkaHost, highWaterMark) => { 
    const messageTransform = new AsyncMessageTransform(func, destTopic);

    const resultProducerStream = myProducerStream({ kafkaHost, highWaterMark, topic: destTopic })

    consumerGroupStream.pipe(messageTransform).pipe(resultProducerStream);
}; 

【问题讨论】:

    标签: apache-kafka node-kafka


    【解决方案1】:

    对于第一个问题: 组中的最大工作消费者等于分区数。

    因此,如果您的 TopicA 有 1 个分区,并且您的使用者组中有 5 个使用者,其中 4 个将处于空闲状态。

    如果您的 TopicA 有 5 个分区,并且您的使用者组中有 5 个使用者,那么所有使用者都将处于活动状态并使用您的主题中的消息。

    要指定分区的数量,您应该从 CLI 创建主题,而不是期望 Kafka 在您首次发布消息时创建它。

    创建具有特定分区数的主题:

    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test
    

    更改已存在主题中的分区数:

    bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic test 
           --partitions 40 
    

    请注意,您只能增加分区数,不能减少分区数。

    请参考Kafka文档https://kafka.apache.org/documentation.html

    另外,如果您想了解更多关于 Kafka 的信息,请查看免费书籍https://www.confluent.io/resources/kafka-the-definitive-guide/

    【讨论】:

    • 是否可以从您的 nodejs 应用程序中指定分区数?在 kafka-node 文档中,他们执行了以下方法: var topicsToCreate = [{ topic: 'topic1', partitions: 1, replicationFactor: 2 }] client.createTopics(topicsToCreate, (error, result) => { // result is如果无法创建给定主题的任何错误数组});虽然这对我不起作用。
    • 没有工作,因为您在回调函数中有错误?或使用默认分区数创建?还有你使用的是哪个版本的 kafka?
    • 我正在使用最新的 kafka 版本。当我创建一个生产者流并且它只产生一个主题分区时会出现问题:stackoverflow.com/questions/59627078/…
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-07-31
    • 2018-08-05
    • 1970-01-01
    • 1970-01-01
    • 2015-11-29
    • 2016-10-01
    相关资源
    最近更新 更多