【发布时间】:2020-12-20 23:20:19
【问题描述】:
尝试向 Kafka 发送 Avro 格式的消息并使用它。直到经过一些研究添加 Thread.sleep(16000) 以便生产者等待消息之后,它才发送消息。但是它再次停止工作。是org.apache.kafka.common.protocol.Errors - Unexpected error code: 87. Failed to produce messages to topic。
有什么建议吗?下面是我的代码
public class AvroAutomationTest3IT {
private static final Logger LOGGER = LoggerFactory.getLogger(AvroAutomationTest3IT.class);
private static Properties producerProps;
private final String topic = "topic.one";
String schemaPath = "src/test/resources/automation-tests/sample-avro.avsc";
// subject convention is "<topic-name>-value"
String subject = topic + "-value";
// avsc json string.
String schema = null;
// kafka broker list.
private static String brokers = "xxx:9093";
// schema registry url.
private static String registry = "xxx:8081";
private static Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create();
@BeforeAll
public static void setUp() throws IOException {
producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
producerProps.put("acks", "1");
producerProps.put("reconnect.backoff.ms", "5000");
producerProps.put("retry.backoff.ms", "1000");
producerProps.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registry);
//producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
producerProps.put(KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
// producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
// producerProps.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(CLIENT_ID_CONFIG, "AvroProducer");
// producerProps.put(ProducerConfig.ACKS_CONFIG, "0");
// producerProps.put(ProducerConfig.RETRIES_CONFIG, "0");
//configure the KafkaAvroSerializer
producerProps.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "1");
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
//consumer properties
producerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
producerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
producerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
producerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "NewConsumer");
producerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
producerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
//sslConfig
producerProps.put("security.protocol", "SSL");
producerProps.put("ssl.truststore.location", "C:/Users/xx/truststore.jks");
producerProps.put("ssl.truststore.password", "expeditors");
producerProps.put("ssl.keystore.location", "C:/Users/xx/xx.jks");
producerProps.put("ssl.keystore.password", "xxx");
producerProps.put("ssl.key.password", "xxx");
}
@Test
public void avroTest() throws Exception {
sendMessage();
Thread.sleep(16000);
readMessage();
}
public void readMessage() {
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(producerProps);
consumer.subscribe(Collections.singletonList(topic));
try {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(15000));
// assertEquals(2, records.count(), "Expected 2 record");
for (ConsumerRecord<String, byte[]> record : records) {
try {
JsonElement el = this.parseAvroMessage(topic, record.value());
System.out.printf("offset = %d, value = %s\n", record.offset(), el);
} catch (Exception ex) {
ex.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.commitAsync();
consumer.close(Duration.ofMillis(3000));
}
}
private JsonElement parseAvroMessage(String topic, byte[] value) {
HashMap<String, String> configs = new HashMap<>();
configs.put("schema.registry.url", registry);
KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();
deserializer.configure(configs, true);
Object obj = deserializer.deserialize(topic, value);
return gson.fromJson(obj.toString(), JsonElement.class);
}
public void sendMessage() throws IOException {
// construct kafka producer.
Producer<Integer, GenericRecord> producer = new KafkaProducer<Integer, GenericRecord>(producerProps);
// message key.
// int userIdInt = 1;
// message value, avro generic record.
GenericRecord record = this.buildRecord();
// send avro message to the topic page-view-event.
producer.send(new ProducerRecord<Integer, GenericRecord>("visibility.avro.topic.source.one", null, record));
// producer.flush();
}
public GenericRecord buildRecord() throws IOException {
// avsc json string.
String schemaString = null;
FileInputStream inputStream = new FileInputStream(schemaPath);
try {
schemaString = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
} finally {
inputStream.close();
}
// avro schema.
Schema schema = new Schema.Parser().parse(schemaString);
GenericRecord metadata = new GenericData.Record(schema.getField("metadata").schema());
metadata.put("version", "1");
metadata.put("eventName", "event.name");
GenericRecord data = new GenericData.Record(schema.getField("data").schema());
data.put("name", "Bob");
data.put("age", 25);
GenericRecord record = new GenericData.Record(schema);
record.put("metadata", metadata);
record.put("data", data);
return record;
}
}
【问题讨论】:
-
我建议删除所有不必要的代码并提供一个最小的可重现示例,以便我们有机会缩小错误范围。理想情况下,您还将清楚地将消费者属性与生产者属性分开,并删除注释掉的代码。你能说出究竟是哪一行引发了错误吗?
-
注意:您不应该将生产者和消费者配置添加到同一个属性对象。 Kafka 还包括 MockProducer 和 MockConsumer,因此您不需要使用真实的代理或注册表来测试序列化和发送数据
-
另外...你真的需要 Avro 和 JSON吗?你为什么要
toString你的 Avro 对象? -
@OneCriketeer。只是为了在我编写真正的测试之前能够查看输出以确认消费者正在拉动某些东西。我认为 Mock Producer 和 MockConsumer 不会与 Avro Registery 一起使用,我必须这样做才能生成 Avro 消息。至少我还没有找到使用 Avro 的方法。
标签: java apache-kafka avro kafka-producer-api