【发布时间】:2021-03-31 07:03:29
【问题描述】:
我正在通过以下代码向 Kafka 代理实例发送 Apache Avro 格式的消息:
ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(kafkaTopic.getTopicName(), null, null,
avroConverter.getSchemaId().toString(), convertRecordToByteArray(kafkaRecordToSend));
String avroSchemaName = null;
// some of my AVRO schemas are unions, some are simple:
if (_avroSchema.getTypes().size() == 1) {
avroSchemaName = _avroSchema.getTypes().get(0).getName();
} else if (_avroSchema.getTypes().size() == 2) {
avroSchemaName = _avroSchema.getTypes().get(1).getName();
}
// some custom header items...
producerRecord.headers().add(MessageHeaders.MESSAGE_ID.getText(), messageID.getBytes());
producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_REGISTRY_SUBJECT.getText(),
avroSchemaName.getBytes());
producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_REGISTRY_SCHEMA_ID.getText(),
avroConverter.getSchemaId().toString().getBytes());
if (multiline) {
producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_MULTILINE_RECORD_NAME.getText(),
MULTILINE_RECORD_NAME.getBytes());
}
try {
Future<RecordMetadata> result = kafkaProducer.send(producerRecord);
RecordMetadata sendResult = result.get();
MessageLogger.logResourceBundleMessage(_messages, "JAPCTOAVROKAFKAPRODUCER:DEBUG0002",
sendResult.offset());
} catch (Exception e) {
MessageLogger.logError(e);
throw e;
}
代码运行良好,消息在 Kafka 中结束,并在 InfluxDB 中进行处理。问题是每次发送操作都会产生大量的 INFO 消息(客户端 ID 号就是一个例子):
- [Producer clientId=producer-27902] 关闭 Kafka 生产者,timeoutMillis = 10000 毫秒。
- [Producer clientId=producer-27902] 使用 timeoutMillis = 9223372036854775807 毫秒关闭 Kafka 生产者。
- Kafka startTimeMs: ...
- Kafka 提交 ID:...
- [生产者 clientId=producer-27902] 集群 ID:
哪些垃圾邮件我们的 Graylog。
我使用类似的代码来发送 String 格式的消息。执行此代码时不会产生 INFO 消息...
ProducerRecord<String, String> recordToSend = new ProducerRecord<>(queueName, messageText);
recordToSend.headers().add("messageID", messageID.getBytes());
Future<RecordMetadata> result = _producerConnection.send(recordToSend);
我知道 INFO 消息是从 org.apache.kafka.clients.producer.KafkaProducer 类记录的。我需要删除这些消息,但我无权访问为 Graylog 定义记录器属性的 logging.mxl。
有没有办法通过 POM 条目或以编程方式消除这些消息?
【问题讨论】:
-
那些日志来自 log4j,在它们到达 graylog 之前,您是否尝试过使用自己的 log4j 配置?
-
不,我没有。垃圾邮件是由于我的错误/设计缺陷造成的。查看我的解决方案。
标签: java logging apache-kafka avro slf4j