【问题标题】:How to Set Spring Kafka consumer max attempts when using Schema Registry使用 Schema Registry 时如何设置 Spring Kafka 消费者最大尝试次数
【发布时间】:2018-09-28 03:59:52
【问题描述】:

我正在使用 Spring kafka(1.3.2.RELEASE)、apache avro(1.8.2) 和 io.confluent 的 Schema Registry(3.1.2) 开发 Spring Boot 服务器。因此,即使 kafka 监听器收到一条 kafka 消息,它也会在消息中找到模式 id,并通过 id 从注册服务器获取 avro 模式。问题是,如果方案注册表配置服务器关闭,我的侦听器将继续尝试在收到消息时向注册表服务器发送 http 请求以获取 avro 模式(也会打印大量错误log),并且它将阻止所有下一个 kafka 消息,因为偏移量不会继续

16:56:41.541 ERROR KafkaMessageListenerContainer$ListenerConsumer -  - org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 - Container exception
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition trade-0 at offset 810845
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 21
Caused by: java.net.ConnectException: Connection refused (Connection refused)
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:589)
        at java.net.Socket.connect(Socket.java:538)
        at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
        at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
        at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
        at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
        at sun.net.www.http.HttpClient.New(HttpClient.java:339)
        at sun.net.www.http.HttpClient.New(HttpClient.java:357)
        at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1202)
        at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1138)
        at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1032)
        at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:966)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1546)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1474)
        at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:153)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:187)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:323)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:316)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:63)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndID(CachedSchemaRegistryClient.java:118)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92)
        at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:54)
        at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
        at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:918)
        at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
        at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1095)
        at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:944)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:567)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:528)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:614)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.lang.Thread.run(Thread.java:748)

我尝试使用 RetryTemplate 设置最大尝试次数,但没有成功,看来 RetryTemplate 可能只在我的侦听器方法中有效。此外,我在 io confluent 的网站上也没有找到任何有用的配置。

【问题讨论】:

    标签: apache-kafka avro spring-kafka confluent-schema-registry


    【解决方案1】:

    现在我使用 CustomAvroDeserializer 替换 KafkaAvroDeserializer,它扩展了 KafkaAvroDeserializer 并通过在其内容中添加 try-catch 来覆盖其反序列化方法,如下所示:

    @Log4j
    public class CustomAvroDeserializer extends KafkaAvroDeserializer {
    
      @Override
      public Object deserialize(String s, byte[] bytes) {
        try {
          return this.deserialize(bytes);
        } catch (Exception e) {
          log.error("encounter a problem when deserializer message with schema registry:{}", e);
          return null;
        }
      }
    }
    

    【讨论】:

      猜你喜欢
      • 2019-06-01
      • 2018-10-20
      • 2019-04-22
      • 2020-11-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-05-09
      相关资源
      最近更新 更多