【问题标题】:Should Consumer process messages, then send messages back to Kafka消费者是否应该处理消息,然后将消息发送回 Kafka
【发布时间】:2019-11-26 23:01:51
【问题描述】:

我想用 topic1 处理来自 Consumer 的数据,然后将消息发送回 Kafka 到 topic2

Kafka --> Consumer (processing messages) from topic1, then call a Producer to send processed message to topic2 --> Kafka

我的尝试:

 consumer.on('message', (message) => {
     let processedMsg = processMessage(message);

     payloads = [
        { topic: 'topic2', messages: processedMsg }
     ];

     producer.on('ready', function () {
         producer.send(payloads, function (err, data) {
             console.log(data);
         });
     });

     producer.on('error', function (err) {})
});

但是,Producer 无法将处理后的消息发送到 Kafka。我得到的错误

MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 ready listeners added. Use emitter.setMaxListeners() to increase limit

我使用节点模块Kafka-node

【问题讨论】:

    标签: node.js apache-kafka kafka-consumer-api kafka-producer-api


    【解决方案1】:

    您需要切换生产者就绪监听器和消费者消息监听器的顺序。

    否则,您将为每条消费的消息设置就绪侦听器

    例如

     producer.on('ready', function () {
        consumer.on('message', (message) => {
          let processedMsg = processMessage(message);
    
          payloads = [
           { topic: 'topic2', messages: processedMsg }
          ];
    
           producer.send(payloads, function (err, data) {
             console.log(data);
           });
     });
    

    不过,如果主要处理和转发新主题 https://github.com/nodefluent/kafka-streams/

    ,我建议您查看这个库

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-11-11
      • 2017-09-23
      • 1970-01-01
      • 2019-01-24
      • 2019-08-25
      • 1970-01-01
      相关资源
      最近更新 更多