【问题标题】:Kafka: Manually producing record triggers exception in consumerKafka:在消费者中手动生成记录触发异常
【发布时间】:2020-05-23 23:00:04
【问题描述】:

我有一个从多个主题进行投票的消费者。 到目前为止,我只使用 Java 对这些主题进行了记录,并且一切正常。

我使用 avro 的 confulent 工具。

现在我尝试通过终端手动生成主题。

我创建了一个 avro-producer,其架构与其他生产者使用的模式相同:

# Produce a record with one field
kafka-avro-console-producer \
  --broker-list 127.0.0.1:9092 --topic order_created-in \
  --property schema.registry.url=http://127.0.0.1:8081 \
  --property value.schema='{"type":"record","name":"test","fields":[{"name":"name","type":"string"},{"name":"APropertie","type":{"type":"array","items":{"type":"record","name":"APropertie","fields":[{"name":"key","type":"string"},{"name":"name","type":"string"},{"name":"date","type":"string"}]}}}]}'

之后我生成了一条遵循指定模式的记录:

{"name": "order_created", "APropertie": [{"key": "1", "name": "testname", "date": "testdate"}]}

记录被正确地生成到主题。 但是我的 AvroConsumer 抛出异常:

Polling
Polling
Polling
Polling
Polling
Polling
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition order_created-in-0 at offset 1. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 61
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class test specified in writer's schema whilst finding reader's schema for a SpecificRecord.

Process finished with exit code 1

有什么提示吗? 谢谢!

【问题讨论】:

  • 没有使用过avro,但是“找不到作者架构中指定的类测试”听起来像是在尝试反序列化为类测试(在架构中的记录name 中使用)。
  • 是的。你知道这到底有什么问题吗?在“原始”模式中,我也使用了“测试”。
  • 好的,我认为控制台生产者无法访问架构中提到的 APropertieAEvent 类。这可能是一个原因。

标签: java apache-kafka avro


【解决方案1】:

这与生产者/消费者的配置有关。

普通生产者有这样的配置:

        // normal producer
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("acks", "all");
        properties.setProperty("retries", "10");

Avro 通常会添加以下属性:

        // avro part
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
        properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
        properties.setProperty("confluent.value.schema.validation", "true");
        properties.setProperty("confluent.key.schema.validation", "true");

这些必须包含在控制台生产者中。

【讨论】:

    猜你喜欢
    • 2016-06-10
    • 2019-11-21
    • 2021-04-25
    • 1970-01-01
    • 2012-07-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多