【发布时间】:2019-06-04 18:50:45
【问题描述】:
我在我的 Spring Boot 应用程序中创建了一个 kafka 消费者,它监听一个主题 - my_topic 并在读取事件 my_event 时执行一些操作。现在我正在这样做:
@KafkaListener(topics = "my_topic",
containerFactory = "my_kafka_container_factory")
public void handleMyKafkaEvents(String eventJson) {
MyDTO my_dto = gson.fromJson(eventJson, MyDTO.class);
String event_type = my_dto.getEventType();
if (event_type != null && event_type.equals("my_event")) {
// do something with my_dto
}
}
// dto object
public class MyDTO {
private String status;
private String eventType;
private String propName;
// some other parameters
// getters and setters
}
我的 kafka 主题中的对象的样例:
{
"eventType": "my_event",
"propName": "prop_value",
"status": "DONE",
//some_other_key_value_pairs_required_in_my_DTO
//some_other_key_value_pairs_not_required_in_my_DTO
}
由于我的侦听器正在侦听推送到 kafka 主题的所有数据,因此我必须在读取每条记录后添加一个条件,如果它的 eventType 是我需要的,那么我正在对其执行一些操作。
目前正在运行。由于会有其他数据推送到它的 eventType 不是我需要的,这些将被忽略,但只有在阅读它们之后,因为我不知道如何根据这个 eventType 进行过滤。
所以我的问题是,当推送到 kafka 主题的事件数量突然激增时,不仅是我的 eventType,还有其他事件,它会影响我的服务性能吗?
我可以在这里改进什么,以便忽略其他事件类型,我的听众甚至不必知道它们。
【问题讨论】:
-
肯定会“影响性能”,毕竟你是在做事。例如,您可以为每个事件类型创建一个主题。
-
是的,我知道它会影响我的服务,我的问题是我如何才能在到达我的服务之前过滤掉不必要的事件。
-
您可以使用 Kafka 流先过滤,然后处理。您必须为过滤结果创建另一个主题,我不确定它实际上是否更高效。如果没有,至少您可以在上游的单独服务中执行此操作并根据您的需要进行扩展。
标签: java spring-boot apache-kafka spring-kafka