【问题标题】:How to send messages synchronously in kafka?在kafka中发送同步消息?
【发布时间】:2018-01-13 17:17:59
【问题描述】:

如何在 kafka 中发送同步消息?
实现它的一种方法是设置属性参数
max.in.flight.requests.per.connection = 1

但我想知道是否有在 kafka 中发送同步消息的直接或替代方式。 (类似于 producer.syncSend(...) 等)。

【问题讨论】:

  • 你能解释一下你为什么想要那个吗?如果这是关于消息排序保证,这可能会更复杂一些(并且同步发送它们并不会真正改变那里的事情)。
  • 同一生产者发送到同一主题中同一分区的消息将保留该顺序。跨多个分区、主题或生产者没有排序保证。如果需要,您必须在应用程序代码的消费者端安排消息(例如通过查看消息时间戳)。
  • @Thilo,这让我想到了另一个问题。每个分区或每个主题或每个生产者的批次?或者这些的某种组合。 ?

标签: apache-kafka kafka-producer-api


【解决方案1】:

当 max.in.flight.requests.per.connection = 1 时,仅表示在分区内保证消息的顺序与同步无关。

Python 代码以防万一。 对于同步发送,请确保在未来有一个良好的超时时间。

from kafka import KafkaProducer
from kafka.errors import KafkaError

#by default ack = 1, if ack = 'all' --> waits for acks from replicas 
producer = KafkaProducer(bootstrap_servers=['brokerIP:9092'], ack= 'all')


key = b'key'
value = b'value'

future = producer.send("my-topic", key=key, value=value)

# block on this future for sync sends
try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    log.exception()
    pass

print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)

producer.flush()
producer.close()

【讨论】:

    【解决方案2】:

    根据我对 Kafka 的冒险 :-) 只有当您有一个 Producer 线程并设置 max.in.flight.requests.per.connection = 1(或转为 retries,即 retries= 0 或两者兼有)时,才能保证消息生产的顺序。

    如果您要扩展到多个 Producer,那么您必须“确保”将存储到同一分区的消息将由同一 Producer 实例生成。

    【讨论】:

      【解决方案3】:

      正如 Thilo 建议的那样,您可以致电 Future#get 阻止直到发送完成。但是,您可能会遇到一些性能问题,因为当生产者队列有 batch.size 元素时,当大小为 buffer.memory 的缓冲区已满或在 max.block.ms 毫秒之后,生产者开始发送。

      如果推送到 kafka 的线程数量有限,则每次都必须等待 max.block.ms 才能发送消息。所以在某些情况下,你会更喜欢使用:

      // send message to producer queue
      Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, key, message));
      // flush producer queue to spare queuing time
      producer.flush();
      // throw error when kafka is unreachable
      future.get(10, TimeUnit.SECONDS);
      

      【讨论】:

        【解决方案4】:

        Thilo 提出的答案是可行的方法。一般来说,您关于使用 max.in.flight.requests.per.connection = 1 的建议用于启用仍然重试但不会丢失消息排序。它不那么用于拥有同步生产者。

        【讨论】:

          【解决方案5】:

          生产者 API 从 send 返回一个 Future。您可以致电Future#get 阻止直到发送完成。

          看到这个example from the Javadocs

          如果你想模拟一个简单的阻塞调用,你可以立即调用 get() 方法:

           byte[] key = "key".getBytes();
           byte[] value = "value".getBytes();
           ProducerRecord<byte[],byte[]> record = 
               new ProducerRecord<byte[],byte[]>("my-topic", key, value)
           producer.send(record).get();
          

          【讨论】:

          • 按照此答案中的建议调用 get() 只会告诉您消息已完全发送。就提供模拟同步调用的解决方案而言,这无济于事。一个涉及 kafka 的真正解决方案将涉及由完成消费者消费的某种完成消息,这将释放被阻塞的请求线程。
          • 如果“同步调用”的意思是你想等到你感兴趣的所有消费者都完成了你希望他们根据消息采取的任何行动,那么是的,答案是不提供“同步 RPC 机制”。但它确实解决了 OP 对producer.sendSync 的请求。
          • @Thilo 我认为我们还需要在 get() 方法之后刷新消息,以确保将其传递给代理。如果我错了,请纠正我。
          • @AnkitSingodia 阻塞直到从生产者返回的 Future 完成将(默认情况下)包括来自经纪人的交付确认。这可以通过acks producer 参数进行配置。您可以将其设置为 0 以禁用确认,或者将其设置为大于 1 的值以同时确认成功复制到配额。
          猜你喜欢
          • 2022-11-11
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2019-11-21
          • 2016-02-18
          • 2017-02-23
          相关资源
          最近更新 更多