【发布时间】:2015-04-09 08:27:46
【问题描述】:
使用 spring-integration-kafka 扩展和以下配置:
<int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="#{kafkaConfig['zooKeeperUrl']}" zk-connection-timeout="10000"
zk-session-timeout="10000" zk-sync-time="2000" />
<int-kafka:consumer-context id="consumerContext" consumer-timeout="5000" zookeeper-connect="zookeeperConnect">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration
group-id="realtime-services-consumer-grp"
value-decoder="purchaseDecoder"
key-decoder="kafkaReflectionDecoder"
max-messages="5" >
<int-kafka:topic id="purchase" streams="1" />
</int-kafka:consumer-configuration>
<int-kafka:consumer-configuration
group-id="realtime-services-consumer-gw"
value-decoder="eventDecoder"
key-decoder="kafkaReflectionDecoder"
max-messages="10" >
<int-kafka:topic id="event" streams="1" />
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
<int-kafka:inbound-channel-adapter
id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
auto-startup="true" channel="inputFromKafka">
<int:poller fixed-delay="20" time-unit="MILLISECONDS" />
</int-kafka:inbound-channel-adapter>
例如,当我评论第一个 consumer-configuration 时,我每分钟可以有 300 个事件,没有问题。但是当两者都被激活时。我的吞吐量非常低。来自这两个主题的总吞吐量低于每分钟 50 个。
有人知道为什么我在阅读 2 个主题时表现如此糟糕吗?我在配置中做错了什么?
【问题讨论】:
-
你的
kafkaReflectionDecoder和其他人是什么?他们不能成为瓶颈吗? -
我的消息中的key总是空的,另一个是avro解码器。问题是当我有两个消费者时会发生瓶颈,而我在解码器中没有同步。
-
你介意看看
<int-kafka:message-driven-adaptep>,直到我们发现发生了什么? spring.io/blog/2015/02/09/… -
@ArtemBilan 在我看来,这种消费消息的方式是循环方式,因为我在主题购买中的消息很少,但有很多消息事件,看起来它被卡住了大部分时间从购买轮询。让事件没有机会被消耗。我的 CPU 平均为 0%。购买并没有滞后,而事件有很大的滞后。
标签: spring spring-integration apache-zookeeper apache-kafka