【问题标题】:Impossible to read Kafka message value无法读取 Kafka 消息值
【发布时间】:2018-10-27 18:06:52
【问题描述】:

我正在使用一个简单的生产者来在 kafka 中生成消息。 此消息是一个 json 对象。

这是我制作人的代码:

const kafka = require('kafka-node');
  const Producer = kafka.Producer;
  const KeyedMessage = kafka.KeyedMessage;
  const Offset = kafka.Offset;
  const Client = kafka.Client;
  const p = 0;
  const a = 0;
  const topic = 'claims';

  const client = new Client('192.168.99.100:2181');
  const producer = new Producer(client, { requireAcks: 1 });

  producer.on('error', function (err) {
    done.fail(err);
  });

  producer.on('ready', function () {
    const message = {status: 'Created', id_claims: '12345'};

    producer.send([
      { topic: topic, partition: p, messages: message, attributes: a }
    ], function (err, result) {
      expect(result).not.toBeNull();
      expect(err).toBeNull();
      done();
    });
  });

但是当我尝试阅读此消息的内容时,我总是得到

[对象对象]

这是我的消费者的代码:

'use strict';

const util = require('util')

const kafka = require('kafka-node');
const Consumer = kafka.Consumer;
const Offset = kafka.Offset;
const Client = kafka.Client;
const topic = 'claims';

const client = new Client('192.168.99.100:2181');
const topics = [
    {topic: topic, partition: 0}
];
const options = { autoCommit: false, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024 * 1024 };

const consumer = new Consumer(client, topics, options);
const offset = new Offset(client);

consumer.on('message', function (message) {
  console.log(JSON.stringify(message.value));
  console.log("Value: %j", message.value);
  console.log("Value: %o", message.value);
  console.log('Value: ' + util.inspect(message.value, 10));
  console.log(util.inspect(message.value, {showHidden: false, depth: null}));
});

consumer.on('error', function (err) {
  console.log(err);
});

我在处理程序中尝试的每个方法都返回相同的结果:[Object object]

【问题讨论】:

    标签: json node.js apache-kafka kafka-consumer-api


    【解决方案1】:

    问题出在下面的代码中:

    producer.send([
          { topic: topic, partition: p, messages: message, attributes: a }
        ], function (err, result) {
      expect(result).not.toBeNull();
      expect(err).toBeNull();
      done();
    });
    

    从如下所示的documentation,您不能将object 提供给messages。向消息提供JSON.stringfy(message) 将起作用。

    {
       topic: 'topicName',
       messages: ['message body'], // multi messages should be a array, single message can be just a string or a KeyedMessage instance
       key: 'theKey', // string or buffer, only needed when using keyed partitioner
       partition: 0, // default 0
       attributes: 2, // default: 0
       timestamp: Date.now() // <-- defaults to Date.now() (only available with kafka v0.10 and KafkaClient only)
    }
    

    【讨论】:

      【解决方案2】:

      您的生产者消息可能需要具有以下值:

      JSON.stringify(message)
      

      【讨论】:

      • 虽然你是对的,但你应该给出一点解释。 :-)
      猜你喜欢
      • 2016-10-08
      • 2018-01-12
      • 2021-04-13
      • 1970-01-01
      • 1970-01-01
      • 2015-01-26
      • 1970-01-01
      • 2018-01-09
      • 1970-01-01
      相关资源
      最近更新 更多