【发布时间】:2017-04-22 12:09:15
【问题描述】:
在恢复所有主题的分区时,我会不断收到调试消息。如下所示。此消息每毫秒在我的服务器上连续打印一次。
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-7
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-6
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-9
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-8
这个 这是代码
val zookeeperHost = "localhost"
val zookeeperPort = "9092"
// Kafka queue settings
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(zookeeperHost + ":" + zookeeperPort)
.withGroupId((groupName))
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
// Streaming the Messages from Kafka queue
Consumer.committableSource(consumerSettings, Subscriptions.topics(topicName))
.map(msg => {
consumed(msg.record.value)
})
.runWith(Sink.ignore)
请帮助正确进行分区以停止调试消息。
【问题讨论】:
-
你有没有连续调用KafkaConsumer.resume方法?
-
我没有调用 KafkaConsumer.resume 方法。我在我的主课上调用了一次
Consumer.committableSource。 -
我需要为 Akka-Kafka 做分区配置吗?提前致谢!
标签: scala akka apache-kafka kafka-consumer-api akka-stream