【发布时间】:2016-08-03 19:58:08
【问题描述】:
卡夫卡 0.8V
我想发布 /consume byte[] 对象、java bean 对象、可序列化对象等等。
为这种类型的场景定义发布者和消费者的最佳方式是什么? 当我消费来自消费者迭代器的消息时,我不知道它是什么类型的消息。 谁能指点我如何设计此类场景的指南?
【问题讨论】:
标签: apache-kafka kafka-consumer-api kafka-producer-api
卡夫卡 0.8V
我想发布 /consume byte[] 对象、java bean 对象、可序列化对象等等。
为这种类型的场景定义发布者和消费者的最佳方式是什么? 当我消费来自消费者迭代器的消息时,我不知道它是什么类型的消息。 谁能指点我如何设计此类场景的指南?
【问题讨论】:
标签: apache-kafka kafka-consumer-api kafka-producer-api
我对每个 Kafka 主题强制执行一个模式或对象类型。这样,当您收到消息时,您就知道自己收到了什么。
至少,您应该决定给定主题是要保存binary 还是string 数据,并根据它如何进一步编码。
例如,您可以有一个名为 Schema 的主题,其中包含以字符串形式存储的 JSON 编码对象。
如果您使用JSON 和像 JavaScript 这样的松散类型语言,那么在同一个主题中存储具有不同模式的不同对象可能会很诱人。使用 JavaScript,您只需调用 JSON.parse(...),查看生成的对象,然后弄清楚您想用它做什么。
但是你不能在像 Scala 这样的严格类型语言中做到这一点。 Scala JSON 解析器通常希望您将 JSON 解析为已定义的 Scala 类型,通常是 case class。它们不适用于此模型。
一种解决方案是保留一个模式/一个主题规则,但有点作弊:将一个对象包装在一个对象中。一个典型的例子是一个 Action 对象,其中您有一个描述该操作的标头,以及一个有效负载对象,其架构取决于标头中列出的操作类型。想象一下这个伪模式:
{name: "Action", fields: [
{name: "actionType", type: "string"},
{name: "actionObject", type: "string"}
]}
这样,即使在强类型语言中,您也可以执行以下操作(这也是伪代码):
action = JSONParser[Action].parse(msg)
switch(action.actionType) {
case "foo" => var foo = JSONParser[Foo].parse(action.actionObject)
case "bar" => var bar = JSONParser[Bar].parse(action.actionObject)
}
这种方法的一个巧妙之处在于,如果您有一个消费者只在等待特定的action.actionType,并且只是要忽略所有其他的,那么它非常轻量级,它只解码标题并将其放入关闭解码action.actionObject,直到需要时为止。
到目前为止,这都是关于字符串编码的数据。如果您想处理二进制数据,当然您也可以将其包装在 JSON 中,或者任何基于字符串的编码(如 XML)中。但也有许多二进制编码系统,例如 Thrift 和 Avro。实际上,上面的伪模式是基于 Avro 的。您甚至可以在 Avro 中做一些很酷的事情,例如模式演化,其中包括提供一种非常巧妙的方式来处理上述 Action 用例——您可以定义一个作为子集的模式,而不是将对象包装在对象中其他模式并仅解码您想要的字段,在这种情况下只是 action.actionType 字段。这是对schema evolution的非常出色的描述。
简而言之,我推荐的是:
【讨论】: