【发布时间】:2021-12-08 04:42:53
【问题描述】:
当在 docker 中独立运行 Pulsar 时,在特定情况下反序列化消息时,我们会遇到这个奇怪的问题。 我们使用的是 2.7.1 版本。
我们有一个创建主题和函数的脚本,然后为 JSON 类型的麻烦主题创建模式。整个架构是正确的,但类型不正确。这都是在发送任何消息之前。
我们还启用了set-is-allow-auto-update-schema。
我们称之为trouble-topic,它由两个来源填充:ValidationFunction 和一个 Spring Boot 微服务。
ValidationFunction 验证消息,如果消息有效,它将映射消息发送到 Spring Boot 微服务使用的主题,然后对其执行一些逻辑并将其发送到 trouble-topic,但如果验证失败则直接向trouble-topic发送消息。
当使用 Spring Boot 微服务中的 sendAsync 和以下生产者时,架构得到更新,将 AVRO 作为类型,并且 TroubleFunction 读取 trouble-topic 之后工作正常:
pulsarClient
.newProducer(AvroSchema.of(OurClass.class))
.topic(troubleTopicName))
.create()
但是如果在此之前有些消息没有通过验证,并且在使用上述Producer之前将消息直接发送到trouble-topic,我们会得到一个解析异常。我们通过以下方式从函数发送消息:
context.newOutputMessage(troubleTopicName, AvroSchema.of(OurClass.class))
.value(value)
.sendAsync();
由于某种原因,这不会更新架构类型,架构类型仍然是 JSON。我使用 pulsar admin CLI 验证了每个步骤的模式类型。而当这种情况发生在微服务生产者第一次更新模式类型之前,TroubleFunction 读取trouble-topic 失败并出现以下错误:
11:43:49.322 [tenant/namespace/TroubleFunction-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - [tenant/namespace/TroubleFunction:0] Uncaught exception in Java Instance
org.apache.pulsar.client.api.SchemaSerializationException: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 2)): only regular white space (\r, \n, \t) is allowed between tokens
at [Source: (byte[])avro-serialized-msg-i-have-to-hide Parsing exception: cvc-complex-type.2.4.a: Invalid content was found starting with element 'ElementName'. One of '{"foo:bar":ElementName}' is expected."; line: 1, column: 2]
所以我的问题是这两者之间有什么区别,为什么从函数发送消息不能正确更新架构类型?不是在下面使用同一个 Producer 吗?还有一种方法可以解决这个问题,以便在初始化时设置模式类型,或者至少在从函数发送消息时更新模式类型?
【问题讨论】:
标签: java avro apache-pulsar pulsar