【问题标题】:Consume Batches through Kafka in Spring boot在 Spring Boot 中通过 Kafka 消费批次
【发布时间】:2020-07-09 04:25:21
【问题描述】:

我是 Kafka 新手,想通过消费者启用批量处理。

通读documentation,发现从3.0版本开始我们可以启用批处理。

目前我们正在为 kafka 使用Spring Boot 2.1.3.RELEASE 及以下依赖项:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Greenwich.SR3</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

在开始属性和代码更改之前,我需要在 pom.xml 中进行哪些更改?我需要更改 Springboot 版本吗?

【问题讨论】:

    标签: java spring-boot maven apache-kafka spring-cloud-stream


    【解决方案1】:

    您可以使用@StreamListener 将其作为批处理使用。你只需要给一个反序列化器。示例:

    你只需要提供一个反序列化器。

    public class Person {
    
        private String name;
        private String surname;
        .........
    }
    
    
       @StreamListener(value = PersonStream.INPUT)
        private void personBulkReceiver(List<Person> person) {
            System.out.println("personBulkReceiver : " + person.size());
        }
    
    
    spring:
      cloud:
        stream:
          kafka:
          binders:
            bulkKafka:
              type: kafka
              environment:
                spring:
                  cloud:
                    stream:
                      kafka:
                        binder:
                          brokers: localhost:9092
                          configuration:
                            max.poll.records: 1500
                            fetch.min.bytes: 1000000
                            fetch.max.wait.ms: 10000
                            value.deserializer: tr.cloud.stream.examples.PersonDeserializer
          bindings:
            person-topic-in:
              binder: bulkKafka
              destination: person-topic
              contentType: application/person
              group : omercelik
              consumer:
                batch-mode: true
    
    public class PersonDeserializer extends JsonDeserializer<Person> {
    }
    

    【讨论】:

      【解决方案2】:

      您需要 Boot 2.3.1 和云 Hoxton.SR6。

      批处理模式仅支持函数式编程风格,不支持@StreamListtener

      【讨论】:

      • 此博客是否导致错误信息? medium.com/@vijaya.cigala/…
      • 你没有得到有效载荷转换;仅当您 useNativeDecoding 使用 Kafka 反序列化器转换有效负载时,它才会起作用。
      猜你喜欢
      • 1970-01-01
      • 2016-12-09
      • 1970-01-01
      • 1970-01-01
      • 2018-02-18
      • 2020-04-24
      • 2021-05-06
      • 2019-08-25
      • 1970-01-01
      相关资源
      最近更新 更多