【问题标题】:Can a bean depend on another bean only for tests in Spring?一个 bean 能否仅在 Spring 中依赖另一个 bean 进行测试?
【发布时间】:2018-09-28 22:39:24
【问题描述】:

我正在使用 Spring 进行依赖注入,并且我有一个恰好是 Kafka 生产者服务的 bean,它通过属性文件获取它的配置,如 zookeeper 服务器等。

import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@Service
public class KafkaProducerService implements InitializingBean {

    @Autowired
    private Properties properties;

    private KafkaProducer<String, String> producer;
    private ZkUtils zkUtils;

    public KafkaProducerService() {
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Properties kafkaProducerProperties = new Properties();
        kafkaProducerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProducerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("kafka.bootstrap.servers"));
        kafkaProducerProperties.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, properties.getProperty("kafka.producer.timeout", "3000"));
        kafkaProducerProperties.setProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, properties.getProperty("kafka.reconnect.backoff.ms", "1000"));
        kafkaProducerProperties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, properties.getProperty("kafka.producer.timeout", "3000"));
        kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
        String zookeeperEndpoint = properties.get("zookeeper.connect") + ":2181";
        this.producer = new KafkaProducer<>(kafkaProducerProperties);
        final ZkClient zkClient = new ZkClient(zookeeperEndpoint, 10000, 10000, ZKStringSerializer$.MODULE$);
        zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperEndpoint), false);
    }

    public boolean publishMessage(final String message, final String topic) {
        try {
            producer.send(new ProducerRecord<>(topic, message))
                    .get(3, TimeUnit.SECONDS);
            return true;
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            return false;
        }
    }

    public void tearDown() {
        this.producer.close();
    }
}

我通过将其自动装配到其他服务中来使用此服务,并且在运行应用程序和使用它时效果很好。对于加载嵌入式 Kafka 的测试,我有额外的 spring 上下文。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

    <context:annotation-config/>
    <context:component-scan base-package="my.project.main"/>

    <context:property-placeholder location="classpath*:my_properties.properties"/>

    <bean id="embeddedKafka" class="my.project.main.EmbeddedKafka"
          init-method="setupEmbeddedKafkaWithZookeeper"
          destroy-method="tearDown"/>

    <bean id="properties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="properties">
            <props>
                <prop key="zookeeper.connect">localhost</prop>
                <prop key="kafka.bootstrap.servers">localhost:9092</prop>
                <prop key="acks">1</prop>
                <prop key="kafka.producer.timeout">5000</prop>
                <prop key="kafka.reconnect.backoff.ms">30000</prop>
            </props>
        </property>
    </bean>
</beans>

每当我运行测试时,kafka 服务生产者都应该使用通过 EmbeddedKafka bean 加载的内存中的 kafka。问题在于,在大多数情况下,嵌入式 kafka 启动需要太多时间,因此 kafka 生产者超时并且无法被 Spring 实例化。是否有任何机制可以让 KafkaProducerService bean“等待”直到 EmbeddedKafka bean 被实例化?

【问题讨论】:

  • 当然可以。检查Spring Test
  • 我阅读了您指出的资源,但我看不到任何关于 bean 之间依赖关系的信息。你能详细说明你的答案吗?
  • 好的,简化一下。对于测试建议,您可以提供测试特定的弹簧配置,并用仅测试 bean 替换普通 bean。检查stackoverflow.com/questions/28605833/…

标签: java spring dependency-injection


【解决方案1】:

有一些方法可以让生产者等待。但我猜你不想永远等待。任何等待都会有一些超时(比如说 X 秒),以防万一嵌入式 kafka 配置不正确或发生其他问题,从而测试不会永远挂起。

您可以将生产者的超时设置为 X 秒,这样就可以解决问题。

如果你确实想永远等待,请耐心等待。

您的主要目标是确保在首次访问 zookeeper/kafka 之前 kafka 已启动。

如果在 spring 上下文初始化期间没有发生这种情况(您可以在超时时在堆栈跟踪中检查这一点),那么您唯一需要确保的是嵌入式 kafka init 是同步完成的。

例如,您可以创建将调用 setupEmbeddedKafkaWithZookeeper 的新 bean 并等待 kafka 启动。

如果在 spring 上下文初始化期间访问 zookeeper/kafka,那就更棘手了。你要么需要

  1. 等到 kafka 在发生的那一刻启动
  2. 或者在 spring 上下文创建之前初始化 kafka。

为了等待 kafka,您可以为 KafkaProducerService 创建一个包装器,该包装器将在所有访问 kafka/zookeeper` 的方法中等待 kafka 启动。

或者,您可以通过创建自己的继承(或包装)spring runner 的运行器来初始化嵌入式 kafka,该运行器将在 spring 上下文创建之前进行初始化。

【讨论】:

  • 问题是kafka和zookeeper需要在调用KafkaProducerService的afterPropertiesSet方法之前启动,因为我们尝试连接到zookeeper。增加超时的想法是首先想到的,但这有点不稳定,因为在不同的机器/环境上启动 kafka 需要不同的时间。您的回答中仍有好主意,我会尝试一下,谢谢!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-09-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-06-18
相关资源
最近更新 更多