【问题标题】:what happen to the messages when connection is down between kafka server and producer?当 kafka 服务器和生产者之间的连接断开时,消息会发生什么?
【发布时间】:2019-07-19 01:05:48
【问题描述】:

我是使用 spring boot 的 kafka 新手,我在 projet 工作,我想使用 spring 将 kafka 集成到其中, 所以问题是我想从生产者向消费者发送消息,即使 kafka 服务器没有运行(离线模式)

谁能给我一个如何在离线模式下使用kafka的例子, 我找不到这个主题的教程 我想停止我的 kafka 服务器(例如),同时生产者想向主题发送数据,那么消费者可以得到这些消息吗? 最好的解决方案是什么?它们是真的吗?

*将数据发送到文件,当服务器返回运行时(例如我测试连接),我将数据从文件导出到主题

*将数据发送到数据库,当服务器返回运行(测试连接)时,我将消息(数据)从数据库发送到我的主题

*使用队列或列表来存储消息,当服务器返回运行(测试连接)时,我将数据从列表发送到主题,但问题是我有很多消息

-->如果有其他解决方案和一个简单的例子,谁能帮助我?

这是一个代理 Redis 的例子,我们测试 Redis 代理和生产者之间的连接,如果连接失败,我会将数据存储在一个 Queue 中,该队列可以存储许多消息,当 Redis 和 producer 之间的连接恢复工作时,producer 现在从 Queue 中获取这些消息并将它们发送到 Redis Brocker。

但是这个broker的问题,有一些消息丢失了 所以我们决定在我的项目中集成 kafka brocker 而不是 Redis brocker!

谁能给我举个例子,在 java 中如何在生产者将大量消息发送到 kafka 集群之前存储它们?或者因为我们不想使用相同的队列解决方案,所以这个问题的最佳解决方案是什么?

python 中的这个例子是如果连接到服务器失败,如何将消息存储在队列中:

    try:
    urllib.request.urlopen('http://serverAdress', timeout=0.1)
    r.publish(topicProduction,json_background_of_message1)
    print(json_background_of_message1)
    arretControle=Tru
    except Exception as e:
    qArret.put(json_background_of_message1)
    print("arret")
    arretControle=True

//json_background_of_message1是一个Queue,如果连接失败,我们可以在这个Queue中存储很多消息,然后再发送这些消息

【问题讨论】:

  • Kafka 集群应该一直在运行。少数代理可以关闭,集群将正常工作(取决于复制因子)。没有像using kafka in offline mode 这样的东西。
  • 例如,如果我的生产者想发送 1000 条消息,但在生产者发送 10 条消息后,我的远程 kafka 服务器突然停机(我的应用程序服务器突然停机)! ! 990 条消息(其余消息)发生了什么?是否有存储这些消息的解决方案,或者如果我的 kafka 服务器出现故障我该怎么办?
  • Kafka 作为集群运行,因此即使一个代理(远程 kafka 服务器)宕机,您也可以发送和消费消息。您必须正确设置复制因子。我建议您阅读 Kafka 文档 (kafka.apache.org/documentation)。真的很好。
  • 在我工作的项目中,之前的开发者已经集成了Redis broker,他已经测试了redis和生产者之间的连接,如果有连接-> 我们可以将生产者消息发送到我们的代理redis,否则我们会将生产者发送的消息存储在队列中(在列表中),当连接回来时,生产者可以发送消息......但是有一个丢失的消息
  • 感谢 wardziniak 为您提供帮助我做同样的事情还是什么?你明白我的意思吗?

标签: java spring-boot apache-kafka


【解决方案1】:

Kafka 被设计成一个高度可用的消息传递系统。正确配置,并且根据复制因素,您可以让多个代理完全关闭,一次完全关闭数天,并且集群仍然可以工作(尽管可能在更高的负载下)。一旦成功部署,我使用过的每个 Kafka 生产集群都没有完全关闭。我们有个别经纪人倒闭,有时连续几天倒闭,但这从来都不是问题。

您提出的是一种后备或备份方法,以防 Kafka 不可用。但是,您仍然有同样的问题。如果将消息转储到文件中,多久会耗尽磁盘空间?如果将消息存储在数据库中,数据库空间用完需要多长时间?如果您将消息存储在内存队列中,多久会耗尽内存并导致应用程序崩溃?而且现在您还必须构建一种机制来从 kafka 中断中恢复,这会增加复杂性和开销。

使用 Kafka 的最佳方法是对其进行配置并将其作为高可用性系统进行处理,正确配置警报和指标,这样您就会立即收到警报,并在出现问题时及时做出反应。此外,您应该始终调整和测试您的应用程序,以便您有足够的空间来处理最坏的情况。如果您将其配置为使用复制因子 3,您将能够丢失任意两个代理,并且集群仍然能够正常运行而不会丢失数据。

现在,在应用程序方面,您在 Kafka 不可用时的行为应该取决于消息的重要性。如果您可以容忍丢失消息,那么如果生产者返回异常,则将其丢弃,并记录/发送警报。但是,如果它们是非常重要的记录,那么在您完全确认它们已保存在 Kafka 中之前,您不应确认/提交上游系统(无论记录来自何处)上的消息。为此,我建议将生产者确认设置为-1all,在失败的情况下多次重试,并在producer.send() 方法上设置适当的回调方法。更详细的解释见这里:https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/producer/Callback.html

如其他人所说,欲了解更多详情,请阅读官方文档:https://kafka.apache.org/documentation/

【讨论】:

    猜你喜欢
    • 2020-02-09
    • 1970-01-01
    • 2017-06-10
    • 1970-01-01
    • 2011-06-12
    • 1970-01-01
    • 1970-01-01
    • 2021-08-17
    • 1970-01-01
    相关资源
    最近更新 更多