【问题标题】:Spring Integration Kafka threading configSpring Integration Kafka 线程配置
【发布时间】:2015-04-17 21:30:40
【问题描述】:

我正在使用带有以下配置的 spring-integration-kafka 1.1.0。我不太了解streams 配置。当我增加这个时,Spring 会自动产生更多线程来处理消息吗?例如当我有streams=2 时,相关变压器和服务激活器是否都在 2 个线程中运行?我觉得缺少一些线程执行器配置,但不确定如何。任何提示表示赞赏。谢谢。

<int:poller default="true" fixed-delay="10"/>

<int:channel id="tag.track">
</int:channel>

<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapterForTagTrack" kafka-consumer-context-ref="consumerContextForTagTrack" auto-startup="true" channel="tag.track">
</int-kafka:inbound-channel-adapter>

<int-kafka:consumer-context id="consumerContextForTagTrack"
	consumer-timeout="${kafka.consumer.timeout}" zookeeper-connect="zookeeperConnect">
	<int-kafka:consumer-configurations>
		<int-kafka:consumer-configuration group-id="${kafka.consumer.group.track}" max-messages="200">
			<int-kafka:topic id="tag.track" streams="2" />
		</int-kafka:consumer-configuration>
	</int-kafka:consumer-configurations>
</int-kafka:consumer-context>

<int:channel id="tag.track.transformed">
	<int:interceptors>
		<int:wire-tap channel="event.logging" />
	</int:interceptors>
</int:channel>

<int:transformer id="kafkaMessageTransformerForTagTrack"
	ref="kafkaMessageTransformer" input-channel="tag.track" method="transform"
	output-channel="tag.track.transformed" />

<int:service-activator input-channel="tag.track.transformed" ref="tagTrackMessageHandler" method="handleTagMessage">
	<int:request-handler-advice-chain>
		<ref bean="userTagRetryAdvice" />
	</int:request-handler-advice-chain>
</int:service-activator>

尝试了消息驱动通道适配器,但无法使其工作,以下配置不会接收任何消息。还尝试了org.springframework.integration.kafka.listener.KafkaTopicOffsetManager,它抱怨Offset management topic cannot have more than one partition。另外,在这个适配器中,如何配置消费者组? 有没有关于如何使用消息驱动通道适配器的详细示例? project page上的指令水平很高。

<int:channel id="tag.track">
	<int:queue capacity="100"/>
</int:channel>

<bean id="kafkaConfiguration" class="org.springframework.integration.kafka.core.ZookeeperConfiguration">
	<constructor-arg ref="zookeeperConnect"/>
</bean>

<bean id="connectionFactory" class="org.springframework.integration.kafka.core.DefaultConnectionFactory">
	<constructor-arg ref="kafkaConfiguration"/>
</bean>

<bean id="decoder" class="org.springframework.integration.kafka.serializer.common.StringDecoder"/>

<int-kafka:message-driven-channel-adapter
		id="adapter"
		channel="tag.track"
		connection-factory="connectionFactory"
		key-decoder="decoder"
		payload-decoder="decoder"
		max-fetch="100"
		topics="tag.track"
		auto-startup="true"
		/>

【问题讨论】:

    标签: spring-integration apache-kafka


    【解决方案1】:

    streams 属性与 Spring 本身无关;它只是在调用ConsumerConnector.createMessageStreams() 时传递给 Kafka(每个主题/流条目都在 map 参数中传递)。

    请参阅kafka documentation

    编辑

    使用高级消费者时,轮询kafka入站通道适配器,因此下游集成流运行的线程与kafka客户端线程无关;它们在轮询器配置中进行管理。

    您可以考虑改用消息驱动的通道适配器。

    【讨论】:

    • 根据 kafka 文档,流的数量等于消费者线程的数量。我在想,当我在通道适配器上创建 2 个使用者线程(streams=2)时,下游通道转换器服务激活器组件是否都在 2 个线程中运行,或者我必须显式配置线程执行器?
    • 感谢您的指导,加里。我试过了,但无法让它工作。在问题中粘贴了我的尝试。
    • 我建议你看一些test cases
    猜你喜欢
    • 2015-12-12
    • 2017-07-30
    • 1970-01-01
    • 2021-03-20
    • 2020-07-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多