【问题标题】:Kafka produce.send never sends the messageKafka producer.send 从不发送消息
【发布时间】:2017-07-14 00:33:18
【问题描述】:

我使用 Kafka 2.12 和 kafka-python 模块作为 Kafka 客户端。我正在尝试测试一个简单的生产者:

class Producer(Process):
daemon = True
def run(self):
    producer = KafkaProducer(bootstrap_servers='kafka:9092')
    print("Sending messages...")
    producer.send('topic', json.dumps(message).encode('utf-8'))

当这个进程被实例化时,消息永远不会被消费者接收到

如果我刷新生产者并更改 linger_ms 参数(使其同步),则消息由消费者发送和读取:

class Producer(Process):
daemon = True
def run(self):
    producer = KafkaProducer(bootstrap_servers='kafka:9092', linger_ms=10)
    print("Sending messages...")
    producer.send('topic', json.dumps(message).encode('utf-8'))
    producer.flush()

在以前的 Kafka 版本中,有参数 queue.buffering.max.ms 来指定生产者将等待多长时间直到发送队列中的消息,但在最新版本中不存在(kafka-python 1.3. 3)。我如何在较新的 Kafka 版本中指定这一点以保持我的通信异步?

谢谢!

【问题讨论】:

    标签: python apache-kafka kafka-python


    【解决方案1】:

    正如您所观察到的,消息排队等待异步发送,并且不能保证它会立即发送。因此,如果您想强制将消息发送到代理,则需要显式调用producer.flush(),这将阻塞直到发送消息(尽管flush() 不保证确认)。

    注意:因为flush() 是一个阻塞调用,它通常只推荐用于低吞吐量系统或在应用程序关闭时。同步发送与异步发送的吞吐量命中通常对于大容量系统是不可行的。我的经验是,生产者通常会很快发送而不需要调用 flush(),除了测试套件/开发需要立即发生的地方。

    我很确定参数 queue.buffering.max.ms 已被 linger_ms 替换:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer

    所以您已经在工作示例中使用了该参数。

    【讨论】:

    • 哇,谢谢。相当恼人的是,kafka-python.readthedocs.io/en/master/usage.html 上有十几种不同的发送示例,但底部只有一个 .flush() 语句,很容易错过。
    • 没错,对于刚接触 Kafka 的人来说,这可能会让人感到困惑。它是开源的,所以我相信他们会欢迎 PR 来改进他们的文档。此外,async send + flush() 是其他语言中 kafka 客户端库生态系统中最常见的 API 模型,因此了解 Kafka 但不了解 Python 的人可能不会对此感到惊讶。
    • 你说得对,它是开源的,所以我不能抱怨。那天早些时候我一直在阅读融合文档,但忘记了我指的是哪一个。冲洗范式在 Kafka 之外也很常见,但我在文档或代码中找不到它(底部除外)
    【解决方案2】:
    producer = KafkaProducer(bootstrap_servers='kafkaIp:kafkaPort')
    producer.send("topic_name", b'Your string here')
    producer.flush()
    

    使用发送和刷新。

    【讨论】:

      【解决方案3】:

      我们希望确保我们的消息能够快速发送,因此我们只是添加了一个单独的线程,该线程运行一个 while 循环,除了调用 producer.flush(timeout = 0.1) 和 sleep* 100 毫秒之外什么都不做。

      我们不想消除批处理带来的所有吞吐量优势,但我们还想确保在流量较低时以最小的延迟(毫秒,而不是分钟)处理消息。

      * 我们使用gevent。如果您使用普通的threading,则可能不需要睡眠。

      【讨论】:

        猜你喜欢
        • 2019-01-24
        • 2017-02-23
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-06-12
        • 1970-01-01
        • 2015-08-11
        相关资源
        最近更新 更多