【发布时间】:2020-02-13 16:00:41
【问题描述】:
在我的 Scala (2.11) 流应用程序中,我正在使用 IBM MQ 中一个队列中的数据并将其写入具有一个分区的 Kafka 主题。使用 MQ 中的数据后,消息有效负载被拆分为 3000 条较小的消息,这些消息存储在字符串序列中。然后使用 KafkaProducer 将这 3000 条消息中的每一条发送到 Kafka(2.x 版)。
你将如何发送这 3000 条消息?
我无法增加 IBM MQ 中的队列数量(不受我的控制),也无法增加主题中的分区数量(需要对消息进行排序,并且编写自定义分区器会影响过多的主题消费者) .
生产者设置当前是:
- acks=1
- linger.ms=0
- batch.size=65536
但优化它们可能是它自己的问题,而不是我当前问题的一部分。
目前,我正在做
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
private lazy val kafkaProducer: KafkaProducer[String, String] = new KafkaProducer[String, String](someProperties)
val messages: Seq[String] = Seq(String1, …, String3000)
for (msg <- messages) {
val future = kafkaProducer.send(new ProducerRecord[String, String](someTopic, someKey, msg))
val recordMetadata = future.get()
}
在我看来,这似乎不是最优雅和最有效的方式。有没有增加吞吐量的程序化方法?
@radai 回答后编辑
感谢答案为我指明了正确的方向,我仔细研究了不同的 Producer 方法。 Kafka - The Definitive Guide 一书列出了这些方法:
即发即弃 我们向服务器发送一条消息,并不关心它是否成功到达。大多数情况下,它会成功到达,因为 Kafka 是高可用的,生产者会自动重试发送消息。但是,使用此方法会丢失一些消息。
同步发送 我们发送一条消息,send() 方法返回一个 Future 对象,我们使用 get() 等待未来,看看 send() 是否成功。
异步发送 我们使用回调函数调用 send() 方法,当它被触发时 收到来自 Kafka 代理的响应
现在我的代码看起来像这样(省略了错误处理和回调类的定义):
val asyncProducer = new KafkaProducer[String, String](someProperties)
for (msg <- messages) {
val record = new ProducerRecord[String, String](someTopic, someKey, msg)
asyncProducer.send(record, new compareProducerCallback)
}
asyncProducer.flush()
我已经比较了 10000 条非常小的消息的所有方法。这是我的测量结果:
即发即弃:173683464ns
同步发送:29195039875ns
异步发送:44153826ns
说实话,通过选择正确的属性(batch.size、linger.ms、...)来优化所有这些可能更有潜力。
【问题讨论】:
-
你使用什么 API 可以用字符串构造一个 KafkaProducer?!
-
没有 IBM MQ Kafka Connect 源吗?那你就不用写Scala了github.com/ibm-messaging/kafka-connect-mq-source
-
好吧,您当前的代码非常好,但如果您想阻止发送每条消息,则可能不行。还有更高级别的库,如 Akka、Fs2 或 ZIO,它们可以帮助实现更多功能的 Scala 模式
-
@mike 这个连接器是由对 MQ 有深入了解的人编写的,我希望它能够处理大多数情况。我建议您在 MQ 连接器上打开一个问题来描述您的问题。
标签: scala apache-kafka kafka-producer-api