【问题标题】:How to efficiently produce messages out of a collection to Kafka如何有效地将消息从集合中生成到 Kafka
【发布时间】:2020-02-13 16:00:41
【问题描述】:

在我的 Scala (2.11) 流应用程序中,我正在使用 IBM MQ 中一个队列中的数据并将其写入具有一个分区的 Kafka 主题。使用 MQ 中的数据后,消息有效负载被拆分为 3000 条较小的消息,这些消息存储在字符串序列中。然后使用 KafkaProducer 将这 3000 条消息中的每一条发送到 Kafka(2.x 版)。

你将如何发送这 3000 条消息?

我无法增加 IBM MQ 中的队列数量(不受我的控制),也无法增加主题中的分区数量(需要对消息进行排序,并且编写自定义分区器会影响过多的主题消费者) .

生产者设置当前是:

  • acks=1
  • linger.ms=0
  • batch.size=65536

但优化它们可能是它自己的问题,而不是我当前问题的一部分。

目前,我正在做

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

private lazy val kafkaProducer: KafkaProducer[String, String] = new KafkaProducer[String, String](someProperties)
val messages: Seq[String] = Seq(String1, …, String3000)
for (msg <- messages) {
    val future = kafkaProducer.send(new ProducerRecord[String, String](someTopic, someKey, msg))
    val recordMetadata = future.get()
}

在我看来,这似乎不是最优雅和最有效的方式。有没有增加吞吐量的程序化方法?


@radai 回答后编辑

感谢答案为我指明了正确的方向,我仔细研究了不同的 Producer 方法。 Kafka - The Definitive Guide 一书列出了这些方法:

即发即弃 我们向服务器发送一条消息,并不关心它是否成功到达。大多数情况下,它会成功到达,因为 Kafka 是高可用的,生产者会自动重试发送消息。但是,使用此方法会丢失一些消息。

同步发送 我们发送一条消息,send() 方法返回一个 Future 对象,我们使用 get() 等待未来,看看 send() 是否成功。

异步发送 我们使用回调函数调用 send() 方法,当它被触发时 收到来自 Kafka 代理的响应

现在我的代码看起来像这样(省略了错误处理和回调类的定义):

  val asyncProducer = new KafkaProducer[String, String](someProperties)

  for (msg <- messages) {
    val record = new ProducerRecord[String, String](someTopic, someKey, msg)
    asyncProducer.send(record, new compareProducerCallback)
  }
  asyncProducer.flush()

我已经比较了 10000 条非常小的消息的所有方法。这是我的测量结果:

  1. 即发即弃:173683464ns

  2. 同步发送:29195039875ns

  3. 异步发送:44153826ns

说实话,通过选择正确的属性(batch.size、linger.ms、...)来优化所有这些可能更有潜力。

【问题讨论】:

  • 你使用什么 API 可以用字符串构造一个 KafkaProducer?!
  • 没有 IBM MQ Kafka Connect 源吗?那你就不用写Scala了github.com/ibm-messaging/kafka-connect-mq-source
  • 好吧,您当前的代码非常好,但如果您想阻止发送每条消息,则可能不行。还有更高级别的库,如 Akka、Fs2 或 ZIO,它们可以帮助实现更多功能的 Scala 模式
  • @mike 这个连接器是由对 MQ 有深入了解的人编写的,我希望它能够处理大多数情况。我建议您在 MQ 连接器上打开一个问题来描述您的问题。

标签: scala apache-kafka kafka-producer-api


【解决方案1】:

我认为您的代码运行缓慢的最大原因是您在等待每次发送的未来。

kafka 旨在发送批次。通过一次发送一条记录,您正在等待每条记录的往返时间,并且您没有从压缩中获得任何好处。

“惯用”的做法是发送所有内容,然后在第二个循环中阻塞所有生成的期货。

另外,如果您打算这样做,我会推迟 linger 备份(否则您的第一条记录会产生一批大小为 1 的记录,总体上会减慢您的速度。请参阅https://en.wikipedia.org/wiki/Nagle%27s_algorithm)并致电生产商 flush()一旦你的发送循环完成。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2023-03-31
    • 2021-04-24
    • 2018-10-16
    • 1970-01-01
    • 1970-01-01
    • 2020-09-20
    • 1970-01-01
    • 2019-12-22
    相关资源
    最近更新 更多