【问题标题】:How to get rid of "Closing the Kafka producer with timeoutMillis = ..."如何摆脱“使用 timeoutMillis = ... 关闭 Kafka 生产者”
【发布时间】:2021-03-31 07:03:29
【问题描述】:

我正在通过以下代码向 Kafka 代理实例发送 Apache Avro 格式的消息:

        ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(kafkaTopic.getTopicName(), null, null,
                avroConverter.getSchemaId().toString(), convertRecordToByteArray(kafkaRecordToSend));

        String avroSchemaName = null;

        // some of my AVRO schemas are unions, some are simple:
        if (_avroSchema.getTypes().size() == 1) {
            avroSchemaName = _avroSchema.getTypes().get(0).getName();
        } else if (_avroSchema.getTypes().size() == 2) {
            avroSchemaName = _avroSchema.getTypes().get(1).getName();
        }

        // some custom header items...
        producerRecord.headers().add(MessageHeaders.MESSAGE_ID.getText(), messageID.getBytes());
        producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_REGISTRY_SUBJECT.getText(),
                avroSchemaName.getBytes());
        producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_REGISTRY_SCHEMA_ID.getText(),
                avroConverter.getSchemaId().toString().getBytes());
        if (multiline) {
            producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_MULTILINE_RECORD_NAME.getText(),
                    MULTILINE_RECORD_NAME.getBytes());
        }

        try {
            Future<RecordMetadata> result = kafkaProducer.send(producerRecord);
            RecordMetadata sendResult = result.get();
            MessageLogger.logResourceBundleMessage(_messages, "JAPCTOAVROKAFKAPRODUCER:DEBUG0002",
                    sendResult.offset());
        } catch (Exception e) {
            MessageLogger.logError(e);
            throw e;
        }

代码运行良好,消息在 Kafka 中结束,并在 InfluxDB 中进行处理。问题是每次发送操作都会产生大量的 INFO 消息(客户端 ID 号就是一个例子):

  • [Producer clientId=producer-27902] 关闭 Kafka 生产者,timeoutMillis = 10000 毫秒。
  • [Producer clientId=producer-27902] 使用 timeoutMillis = 9223372036854775807 毫秒关闭 Kafka 生产者。
  • Kafka startTimeMs: ...
  • Kafka 提交 ID:...
  • [生产者 clientId=producer-27902] 集群 ID:

哪些垃圾邮件我们的 Graylog。

我使用类似的代码来发送 String 格式的消息。执行此代码时不会产生 INFO 消息...

ProducerRecord<String, String> recordToSend = new ProducerRecord<>(queueName, messageText);
    recordToSend.headers().add("messageID", messageID.getBytes());

    Future<RecordMetadata> result = _producerConnection.send(recordToSend);
    

我知道 INFO 消息是从 org.apache.kafka.clients.producer.KafkaProducer 类记录的。我需要删除这些消息,但我无权访问为 Graylog 定义记录器属性的 logging.mxl。

有没有办法通过 POM 条目或以编程方式消除这些消息?

【问题讨论】:

  • 那些日志来自 log4j,在它们到达 graylog 之前,您是否尝试过使用自己的 log4j 配置?
  • 不,我没有。垃圾邮件是由于我的错误/设计缺陷造成的。查看我的解决方案。

标签: java logging apache-kafka avro slf4j


【解决方案1】:

代码行为的原因是设计缺陷: 上面帖子中的代码被放置在一个方法中,该方法被调用用于将消息发送到 Kafka。 KafkaProducer 类在该方法中以及每次调用该方法时都被实例化。令人惊讶的是,KafkaProducer 不仅在调用代码的显式 close() 时发出 Closing the KafkaProducer with timeoutMillis =,而且在对实例的强引用丢失时(在我的情况下,当代码离开方法时),在后者中在这种情况下,timeoutMillis 设置为 9223372036854775807(最大长数)。

为了摆脱许多消息,我将KafkaProducer 实例化移出方法,并将实例变量设为类属性,并且我不再在send(...) 之后调用显式close()

此外,我将实例化 KafkaProducer 的类的实例更改为强引用类成员。

通过这样做,我在实例化时收到了KafkaProducer 的一些消息,然后是静默。

【讨论】:

  • 您有一个示例,说明您对代码进行了哪些更改以使其正常工作?
猜你喜欢
  • 2012-06-20
  • 2019-01-26
  • 1970-01-01
  • 2016-11-02
  • 2017-08-20
  • 2016-07-19
  • 1970-01-01
  • 2010-10-12
  • 1970-01-01
相关资源
最近更新 更多