【发布时间】:2020-05-18 14:30:51
【问题描述】:
我有 5 个 kafka 主题,每秒将发送 100 条消息。消息格式如下
{null,JSON}(分区:10)
我需要根据 JSON 中的值提取过去的记录并在我的 java 应用程序中处理。正确的做法是什么?
- ksqlDB 流
- Kafka 流
- Ktable
提前致谢。
【问题讨论】:
标签: java apache-kafka ksqldb
我有 5 个 kafka 主题,每秒将发送 100 条消息。消息格式如下
{null,JSON}(分区:10)
我需要根据 JSON 中的值提取过去的记录并在我的 java 应用程序中处理。正确的做法是什么?
提前致谢。
【问题讨论】:
标签: java apache-kafka ksqldb
虽然可以将 Kafka 用作数据库,但我强烈建议您不要采用这种方法。 Kakfa 是一个消息代理,所以如果你想查看过去的消息你必须重新处理整个主题
您可能能够“优雅地”处理这种情况的唯一情况是,如果您知道消息的偏移量,那么您可以将您的消费者设置为直接去那里,但是由于您描述的用例,我不知道'认为你没有它
我没有使用过 ksqlDB,但它看起来像普通的 KQSL
KSQL 只是 Kafka Streams 之上的抽象,就像 Kafka Streams 是对消费者-生产者的抽象一样。每个抽象都有较少的功能
您可以使用任何您想要的方法来实现您的目标,请注意有几个因素会让您选择其中一种或另一种,例如
【讨论】:
首先,想想你想在你的应用程序中实现什么。如果您想要 DB 功能,例如复杂的连接、使用存储过程等,那么 Kafka 不是您的选择(即使 Kafka 可以进行复杂的连接,但需要您编写更复杂的代码)。
您的场景似乎适合Kafka Streams 拓扑,您可以在其中使用具体化的KTable(实际上是Store),这将允许您通过分区键查找数据。性能将与基于数据库的性能相同,因为 Store 它已具体化为 RocksDB 数据库。
请记住,这样做您将面临分区时常见的所有问题(首先,为所有数据项找到一个公共分区键)。
您可以在此处找到有关Kafka Streams 工作原理的更多详细信息:https://docs.confluent.io/current/streams/architecture.html 和关于Kafka Store:https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/state/Stores.html。
【讨论】: