【发布时间】:2022-03-03 23:54:51
【问题描述】:
我正在尝试为使用 kafka 流将数据从 kafka 发送到 websocket 连接的应用程序编写 Junit 测试。我已经能够在本地运行应用程序并发布到该主题并查看在套接字连接上返回的数据。但是我正在努力正确设置测试环境。
要测试的代码
当前我有这个作为我要测试的方法:
public Source<JsonNode, ?> getKafkaStream(String groupId, Set<Id> idsBelongingToUser) {
return Consumer.plainSource(getKafkaConsumerSettings(groupId), Subscriptions.topics(this.kafkaTopic))
.map(ConsumerRecord::value)
.filter(event -> idsBelongingToUser.contains(event.getId())
.map(consumerRecord -> objectMapper.convertValue(consumerRecord, JsonNode.class));
}
然后我在这里的流程中使用这个来源:
public Flow<JsonNode, JsonNode, ?> getDataStream(String groupId, Set<Id> idsBelogingToUser) {
return Flow.fromSinkAndSource(
Sink.ignore(),
this.getKafkaStream(accountOfListener, idsBelogingToUser).runWith(BroadcastHub.of(JsonNode.class), this.materializer)
);
}
我尝试的测试
我最近的尝试是使用 TestcontainersKafkaJunit4Test 测试代码 这是我如何设计的修改文件。
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.kafka.testkit.KafkaTestkitTestcontainersSettings;
import akka.kafka.testkit.javadsl.TestcontainersKafkaJunit4Test;
import akka.stream.testkit.javadsl.TestSink;
import akka.testkit.javadsl.TestKit;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.SneakyThrows;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import java.util.concurrent.CompletionStage;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
@TestInstance(Lifecycle.PER_CLASS)
public class KafkaTest extends TestcontainersKafkaJunit4Test {
KafkaService toTest;
String topic;
private static final ActorSystem system = ActorSystem.create("TestkitTestcontainersTest");
KafkaTest() {
super(
system,
KafkaTestkitTestcontainersSettings.create(system)
.withSchemaRegistry(true)
.withNumBrokers(1)
.withInternalTopicsReplicationFactor(1)
);
this.setUpAdminClient();
}
@SneakyThrows
public <T> T getResultFromFuture(CompletionStage<T> future) {
await().atMost(5, SECONDS).until(() -> future.toCompletableFuture().isDone());
return future.toCompletableFuture().get();
}
@Test
void test() {
this.topic = createTopic();
toTest = new KafkaService();
JsonNode event1 = createRandomEvent();
JsonNode event2 = createRandomEvent();
String group = "some-key";
//produce some messages
getResultFromFuture(this.produce(this.topic, new StringSerializer(), new JsonSerdes<>(JsonNode.class).serializer(),
Pair.create(group, event1),
Pair.create(group, event2)));
//Here I want to test the source is getting the messages produced above
//One Attempt
toTest.getKafkaStream(group, getSetOfIds())
.runWith(TestSink.probe(system), materializer)
.expectNext(event1)
.expectNext(event2);
}
JsonNode createRandomEvent() { ... }
@AfterAll
void shutdownActorSystem() {
this.cleanUpAdminClient();
TestKit.shutdownActorSystem(system);
}
}
我相信设置接近可以正常工作,因为当我从使用源流更改为使用消息并使用消费者对象使用它们时,我可以看到事件。我尝试在 map(ConsumerRecord::value) 函数中打印,但没有日志让我相信它永远不会被执行。不幸的是,alpakka kafka 没有很多关于如何测试它的示例,作为 kafka 和 scala 的初学者,我正在努力将现有的示例应用到我的代码中。我已使用以下链接进行设置: alpakka kafka testing docs akka streams testing
我稍微修改了代码以使其更通用。如果有任何信息不清楚,我将解释/编辑问题。提前感谢您的任何帮助/建议
【问题讨论】:
标签: java apache-kafka playframework testcontainers alpakka