Spring for Apache Kafka 官网:https://docs.spring.io/spring-kafka/docs/current/reference/html/

 

一、Spring整合Kafka

1、需要的jar包

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.6.5</version>
        </dependency>

兼容:

  • Apache Kafka Clients 2.0.0
  • Spring Framework 5.1.x
  • Minimum Java version: 8

 

2、Kafka配置类(使用@KafkaListener消费消息)

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;


/**
 * Kafka的配置类
 *
 * @author yangyongjie
 * @date 2019/10/11
 * @desc
 */
@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${kafka.bootstrap.servers}")
    private String bootstrapServers;

    @Value("${kafka.group.id}")
    private String groupId;

    @Value("${kafka.retries}")
    private String retries;

    /**
     * kafka消息监听器容器的工厂类
     *
     * @return
     */
    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
     // 3个KafkaMessageListenerContainer并发监听
     factory.setConcurrency(3);
       // 消费者工厂
        factory.setConsumerFactory(consumerFactory());
        ContainerProperties containerProperties = factory.getContainerProperties();
        // 当Acknowledgment.acknowledge()方法被调用即提交offset
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        // 调用commitAsync()异步提交
        containerProperties.setSyncCommits(false);
        return factory;
    }

    /**
     * 消费者工厂
     *
     * @return
     */
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    /**
     * 消费者拉取消息配置
     *
     * @return
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>(16);
        // kafka集群地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // localhost:9092
        // groupId
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        // 开启自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        // 自动提交offset到zk的时间间隔,时间单位是毫秒
//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        // session超时设置,15秒,超过这个时间会认为此消费者挂掉,将其从消费组中移除
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        //键的反序列化方式,key表示分区
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        //值的反序列化方式
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    /**
     * 生产者工厂
     *
     * @return
     */
    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /**
     * 生产者发送消息配置
     *
     * @return
     */
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>(8);
        // kafka集群地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // 消息发送确认方式
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        // 消息发送重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        // 重试间隔时间
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
        // 控制批处理大小,单位为字节
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        //键的反序列化方式,key表示分区
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        //值的反序列化方式
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    /**
     * Kafka模版类,用来发送消息
     *
     * @return
     */
    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        return new KafkaTemplate<Integer, String>(producerFactory());
    }

}

 

 

3、发送消息

KafkaTemplate包装了producer而且提供了便捷的方法发送消息到kafka topic。

API:

ListenableFuture<SendResult<K, V>> sendDefault(V data); //sendDefault方法需要提供给一个默认的topic
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);// timestamp作为参数并将此时间戳存储在记录中
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message); //topic,partition,key等信息在message头中定义
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
 T doInKafka(Producer<K, V> producer);
}

如:

    @Autowired
    private KafkaTemplate kafkaTemplate;

    kafkaTemplate.send("test", "this is my first demo");

补充,send()方法 :

    @Override
    public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);
        return doSend(producerRecord);
    }

  send()方法返回一个ListenableFuture<SendResult<K, V>>,我们可以向Listener(监听器)注册一个回调,以异步接收发送结果,如:

   ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(producerRecord);

  future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                // Throwable可强制转换为 KafkaProducerException
                KafkaProducerException kafkaProducerException = (KafkaProducerException) ex;
                // 失败的记录,可以从ProducerRecord获取topic、partition、value等信息
                ProducerRecord<?, ?> producerRecord = kafkaProducerException.getProducerRecord();
                // do something
            }

            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                // 成功的记录
                ProducerRecord<Integer, String> producerRecord = result.getProducerRecord();
                // do something
            }
        });

  ps:可以写一个统一的消息发送结果回调处理器,对发送失败的消息进行统一处理。

 

 

4、消费消息

  消费消息可以通过配置一个MessageListenerContainer然后提供一个Message Listener去接收消息;或者使用@KafkaListener注解

 

 1、MessageListenerContainer

  使用消息监听器容器(MessageListenerContainer)时,必须提供一个监听器(Message Listener)以接收数据,目前提供的8个messageListener接口:

public interface MessageListener<K, V> { ❶ // 调用poll()来轮询Kafka集群的消息,并自动提交offset
 void onMessage(ConsumerRecord<K, V> data);
}
public interface AcknowledgingMessageListener<K, V> { ❷ // 调用poll()来轮询Kafka集群的消息,手动提交
 void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
// 调用poll()来轮询Kafka集群的消息,自动提交offset或使用提供的Consumer对象手动提交
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { ❸ void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer); }
// 调用poll()来轮询Kafka集群的消息,使用提供的Consumer对象手动提交
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { ❹ void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); }
// 调用poll()来轮询Kafka集群的消息,自动提交offset或手动提交,AckMode.RECORD不支持
public interface BatchMessageListener<K, V> { ❺ void onMessage(List<ConsumerRecord<K, V>> data); }
// 调用poll()来轮询Kafka集群的消息,手动提交
public interface BatchAcknowledgingMessageListener<K, V> { ❻ void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment); }
// 调用poll()来轮询Kafka集群的消息,自动提交offset或使用提供的Consumer对象手动提交,AckMode.RECORD不支持
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { ❼ void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer); } public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { ❽ // 调用poll()来轮询Kafka集群的消息,使用提供的Consumer对象手动提交 void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); }

需要注意的是:该Consumer对象不是线程安全的;您只能在调用此(Listener)侦听器的线程上调用其方法。

 

MessageListenerContainer的两个实现类:

  • KafkaMessageListenerContainer   使用单线程接收来自所有topics/partitions的所有消息

  • ConcurrentMessageListenerContainer  委托给一个或多个KafkaMessageListenerContainer实例以提供多线程消费

 

  从2.2.7版开始,您可以添加一个RecordInterceptorMessageListenerContainer中;它将在调用(listener)监听器之前被调用,以允许检查或修改记录。如果拦截器返回null,则不调用(listener)监听器。当(listener)监听器为(batch listener)批处理监听器听器时,不会调用该(listener)监听器(没有为批处理监听器提供拦截器,因为Kafka已经提供了ConsumerInterceptor),

  从2.3版开始,CompositeRecordInterceptor可以用于调用多个拦截器。

  默认情况下,使用事务时,在事务启动后将调用拦截器。从版本2.3.4开始,您可以将侦听器容器的interceptBeforeTx属性设置为在事务开始之前调用拦截器。

 

 

  使用ConcurrentMessageListenerContainer:

  其唯一的构造器和KafkaMessageListenerContainer的第一个构造器相似:

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)

有个属性:private int concurrency = 1; 其值的多少表示将会创建多少个KafkaMessageListenerContainer实例

  

  1)构造器的第一个ConsumerFactory参数,Kafka使用其组管理功能将分区分配给各个consumer。

   当监听多个topic时,如需要监听3个topic,每个topic有5个分区,那么设置concurrency=15的话,将会发现只有5个激活的Consumers,每个Consumer从每个topic中分配了一个分区,其他10个则处于空闲状态。这是因为Kafka默认的PartitionAssignor(分区分配器)是RangeAssignor,上面的情况应该使用RoundRobinAssignor来代替。RoundRobinAssignor将会将分区分配给所有的Consumers,这样每个Consumer会分得一个topic的一个分区。想要切换PartitionAssignor,使用ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG

  2)第二个ContainerProperties参数,当使用 TopicPartitionOffset  配置 ContainerProperties时,ConcurrentMessageListenerContainer 委派 KafkaMessageListenerContainer 实例去分配TopicPartitionOffset 实例。也就是说,如果TopicPartitionOffset提供了 6个实例,且 concurrency 为3。每个 KafkaMessageListenerContainer 获得两个分区。如果TopicPartitionOffset提供了 5个实例,且 concurrency 为3,那么前两个 container各获得两个分区,最后一个container获得1个分区。

  如果 concurrency 数量大于TopicPartitions,那么concurrency则会自动向下调整以使每个container获得一个分区

 

  Committing Offsets

  consumer poll()方法将会返回一个或多个ConsumerRecords,每个ConsumerRecord都将会调用一次MessageListener。对于每个ConsumerRecord都提供了多个选择去提交offsets,当enable.auto.commit设置为true(2.3之前默认为true,2.3及之后默认为false)的时候,Kafka将自动提交offsets,如果为false的话,有以下几种AckMode(消息确认)方式:

RECORD 提交offset当监听器处理完每个ConsumerRecord后
BATCH 提交offset当监听器处理完poll()返回的所有的ConsumerRecord
TIME 提交offset当监听器处理完poll()返回的所有的ConsumerRecord并且距离上次提交的时间超过了ackTime
COUNT 提交offset当监听器处理完poll()返回的所有的ConsumerRecord并且距离上次提交已经接收了超过ackCount个ConsumerRecord
COUNT_TIME 提交offset当监听器处理完poll()返回的所有的ConsumerRecord,当ackTime或者ackCount成立时
MANUAL 在Acknowledgment.acknowledge()中使用,和BATCH相似
MANUAL_IMMEDIATE 当Acknowledgment.acknowledge()方法被调用即提交offset

  默认的提交方式为BATCH,默认使用commitSync()同步提交。在ContainerProperties中修改

  调用commitSync()同步提交或者commitAsync()异步提交,取决于syncCommits的配置,默认为true。

 

  MessageListenerContainer自动启动

  MessageListenerContainer 接口继承了 SmartLifecycle 接口,默认是自动启动的,因此消息监听者容器将在稍晚阶段(Integer.MAX-VALUE - 100)启动,其他实现SmartLifecycle以处理来自侦听器的数据的组件,应该在更早的阶段启动。

 

 

  使用MessageListenerContainer这种方式在上面的KafkaConfig配置类中加上如下配置:

 

 1)KafkaMessageListenerContainer

  使用监听容器工厂,创建监听容器:

   /**
     * KafkaMessageListenerContainer,也可使用ConcurrentMessageListenerContainer
     * @param myKafkaMessageListener 自定义的消息监听器
     * @return
     */
    @Bean
    public KafkaMessageListenerContainer<Integer, String> getContainer(MyKafkaMessageListener myKafkaMessageListener) {
        ContainerProperties containerProperties = new ContainerProperties("test");
        containerProperties.setAckMode(ContainerProperties.AckMode.RECORD);

        //使用自定义的MessageListener
        containerProperties.setMessageListener(myKafkaMessageListener);
        containerProperties.setGroupId("bssout");
        containerProperties.setClientId("myListener");

        KafkaMessageListenerContainer<Integer, String> kafkaMessageListenerContainer = new KafkaMessageListenerContainer<>(consumerFactory(), containerProperties);

        return kafkaMessageListenerContainer;
    }

  MyKafkaMessageListener :

@Component
public class MyKafkaMessageListener extends MyAbstractKafkaMessageListener  {

    private static final Logger LOGGER= LoggerFactory.getLogger(MyKafkaMessageListener.class);

    @Override
    public void onMessage(ConsumerRecord consumerRecord, Consumer consumer) {
        String topic=consumerRecord.topic();
        String value= (String) consumerRecord.value();
        LOGGER.info("my_value={}",value);
        consumer.commitAsync();
    }
}

  MyAbstractKafkaMessageListener(因为接口需要实现所有方法,所以不想实现的方法用抽象类来实现) :

public abstract class MyAbstractKafkaMessageListener implements ConsumerAwareMessageListener {

    @Override
    public void onMessage(Object o) {
        return;
    }
}

 

  2)ConcurrentMessageListenerContainer

/**
     * ConcurrentMessageListenerContainer
     *
     * @param myKafkaMessageListener 自定义的消息监听器
     * @return
     */
    @Bean
    public ConcurrentMessageListenerContainer<Integer, String> concurrentMessageListenerContainer(MyKafkaMessageListener myKafkaMessageListener) {
        // 容器属性,监听的topics
        ContainerProperties containerProps = new ContainerProperties("test", "custom-kafka");

        //使用自定义的MessageListener
        containerProps.setMessageListener(myKafkaMessageListener);
        
        ConcurrentMessageListenerContainer<Integer, String> concurrentMessageListenerContainer = new ConcurrentMessageListenerContainer<>(consumerFactory(), containerProps);
        // 并发的监听容器数
        concurrentMessageListenerContainer.setConcurrency(3);
        return concurrentMessageListenerContainer;
    }

 

 

 2、使用@KafkaListener注解

  @KafkaListener 注解用于将Bean 方法指定为监听器容器中的监听器。

  该Bean被包装在MessagingMessageListenerAdapter中,MessagingMessageListenerAdapter配置了各种功能,例如用于在必要时转换数据以匹配方法参数的转换器

 

  @KafkaListener 使用示例:

@Component
public class KafkaListenerTest {

    private static final Logger LOGGER= LoggerFactory.getLogger(KafkaListenerTest.class);

    /**
     * id为consumerId
     * @param record
     */
    @KafkaListener(topics = "test")
    public void listen(ConsumerRecord<?, ?> record,Acknowledgment acknowledgment) {
        LOGGER.info("test-value={}",record.value());
        LOGGER.info("test-topic={}",record.topic());
      // 手动提交offset
      acknowledgment.acknowledge();

    }
}

 

 另外这种方式要求在一个有@Configuration注解的配置类上,同时使用@EnableKafka注解,以及一个监听器容器工厂(配置的bean名称为kafkaListenerContainerFactory ),该监听器容器工厂用于配置 ConcurrentMessageListenerContainer。如:

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000); // 设置容器属性
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

 @EnableKafka注解向容器中导入了 KafkaBootstrapConfiguration 配置类,此配置类中注入了KafkaListenerAnnotationBeanPostProcessor @KafkaListener注解的后置处理器

/**
 * Enable Kafka listener annotated endpoints that are created under the covers by a
 * {@link org.springframework.kafka.config.AbstractKafkaListenerContainerFactory
 * @see KafkaListener
 * @see KafkaListenerAnnotationBeanPostProcessor
 * @see org.springframework.kafka.config.KafkaListenerEndpointRegistrar
 * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(KafkaBootstrapConfiguration.class)
public @interface EnableKafka {
}

  @KafkaListener注解由KafkaListenerAnnotationBeanPostProcessor类解析,其实现了BeanPostProcessor接口,并在postProcessAfterInitialization方法内解析@KafkaListener注解。

@Override
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
            Class<?> targetClass = AopUtils.getTargetClass(bean);
            Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
            final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
            final List<Method> multiMethods = new ArrayList<Method>();
            Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    new MethodIntrospector.MetadataLookup<Set<KafkaListener>>() {

                        @Override
                        public Set<KafkaListener> inspect(Method method) {
                            Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
                            return (!listenerMethods.isEmpty() ? listenerMethods : null);
                        }

                    });
            if (hasClassLevelListeners) {
                Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
                        (ReflectionUtils.MethodFilter) method ->
                                AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
                multiMethods.addAll(methodsWithHandler);
            }
            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(bean.getClass());
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("No @KafkaListener annotations found on bean type: " + bean.getClass());
                }
            }
            else {
                // Non-empty set of methods
                for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
                    Method method = entry.getKey();
                    for (KafkaListener listener : entry.getValue()) {
                        processKafkaListener(listener, method, bean, beanName);
                    }
                }
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '"
                            + beanName + "': " + annotatedMethods);
                }
            }
            if (hasClassLevelListeners) {
                processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
            }
        }
        return bean;
    }
View Code

相关文章: