【发布时间】:2018-07-22 22:16:53
【问题描述】:
我正在使用 Apache Camel 2.20.2(连同 Spring Boot 1.5.8)将通过 MQTT 传入的消息路由到其他两个服务,一个是 HTTP/SOAP(工作正常),另一个是 Apache Kafka。
在一夜之间以每分钟大约 320 条消息的初始运行后,我注意到该过程变得非常缓慢。经过一些分析,我发现 Kafka 路由产生了内存泄漏(确定禁用了 HTTP 路由)。
@Component
public class Router extends RouteBuilder {
@Autowired
ApplicationProperties param;
@Override
public void configure() throws Exception {
logger.info("Starting MqttKafkaBridgeApplication with:\n" + param.toString());
// MQTT Consumer
from("mqtt:vernemq?"
+ "host=tcp:MqttHost:MqttPort...")
.transform(body().convertToString())
.log("Recieved : "+body().convertToString())
.multicast()
.stopOnException().to( "direct:kafka");
// Kafka Producer
from("direct:kafka")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0);
exchange.getIn().setHeader(KafkaConstants.KEY, "1");
}
})
.to("kafka:" + "kafkaTopic" +
"?brokers=kafkaHost:KafkaPort;
}
}
我是骆驼的新手,但据我所知,我的配置很简单? 我可以看到消息到达 Kafka 集群,所以不知道为什么没有释放内存?
【问题讨论】:
-
也许我读错了,但这不是真正的 Kafka 对象而不是 Camel Route 部分吗?
标签: java spring-boot apache-kafka apache-camel