【发布时间】:2015-04-17 21:30:40
【问题描述】:
我正在使用带有以下配置的 spring-integration-kafka 1.1.0。我不太了解streams 配置。当我增加这个时,Spring 会自动产生更多线程来处理消息吗?例如当我有streams=2 时,相关变压器和服务激活器是否都在 2 个线程中运行?我觉得缺少一些线程执行器配置,但不确定如何。任何提示表示赞赏。谢谢。
<int:poller default="true" fixed-delay="10"/>
<int:channel id="tag.track">
</int:channel>
<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapterForTagTrack" kafka-consumer-context-ref="consumerContextForTagTrack" auto-startup="true" channel="tag.track">
</int-kafka:inbound-channel-adapter>
<int-kafka:consumer-context id="consumerContextForTagTrack"
consumer-timeout="${kafka.consumer.timeout}" zookeeper-connect="zookeeperConnect">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration group-id="${kafka.consumer.group.track}" max-messages="200">
<int-kafka:topic id="tag.track" streams="2" />
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
<int:channel id="tag.track.transformed">
<int:interceptors>
<int:wire-tap channel="event.logging" />
</int:interceptors>
</int:channel>
<int:transformer id="kafkaMessageTransformerForTagTrack"
ref="kafkaMessageTransformer" input-channel="tag.track" method="transform"
output-channel="tag.track.transformed" />
<int:service-activator input-channel="tag.track.transformed" ref="tagTrackMessageHandler" method="handleTagMessage">
<int:request-handler-advice-chain>
<ref bean="userTagRetryAdvice" />
</int:request-handler-advice-chain>
</int:service-activator>
尝试了消息驱动通道适配器,但无法使其工作,以下配置不会接收任何消息。还尝试了org.springframework.integration.kafka.listener.KafkaTopicOffsetManager,它抱怨Offset management topic cannot have more than one partition。另外,在这个适配器中,如何配置消费者组?
有没有关于如何使用消息驱动通道适配器的详细示例? project page上的指令水平很高。
<int:channel id="tag.track">
<int:queue capacity="100"/>
</int:channel>
<bean id="kafkaConfiguration" class="org.springframework.integration.kafka.core.ZookeeperConfiguration">
<constructor-arg ref="zookeeperConnect"/>
</bean>
<bean id="connectionFactory" class="org.springframework.integration.kafka.core.DefaultConnectionFactory">
<constructor-arg ref="kafkaConfiguration"/>
</bean>
<bean id="decoder" class="org.springframework.integration.kafka.serializer.common.StringDecoder"/>
<int-kafka:message-driven-channel-adapter
id="adapter"
channel="tag.track"
connection-factory="connectionFactory"
key-decoder="decoder"
payload-decoder="decoder"
max-fetch="100"
topics="tag.track"
auto-startup="true"
/>
【问题讨论】:
标签: spring-integration apache-kafka