【问题标题】:Spring Cloud Stream deserializing invalid JSON from Kafka TopicSpring Cloud Stream 反序列化来自 Kafka 主题的无效 JSON
【发布时间】:2021-08-20 13:57:43
【问题描述】:

我正在努力将 Spring Cloud Streams 与 Kafka binder 集成。目标是我的应用程序使用主题中的 json 并将其反序列化为 Java 对象。我正在使用功能样式方法而不是命令式方法。我的代码使用结构良好的 json 输入。

另一方面,当我发送无效的 json 时,我希望触发错误记录方法。这在某些测试用例中有效,而在其他测试用例中无效。我的应用程序会反序列化 json,即使它无效并触发包含逻辑的方法,而不是错误记录方法。

我无法解决框架反序列化一些非结构化 json 输入的问题。

@Builder
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class KafkaEventRecord {

    @JsonProperty(value = "transport_metadata", required = true)
    @NonNull
    private TransportMetadata transportMetadata;

    @JsonProperty(value = "payload", required = true)
    @NonNull
    private Payload payload;
}


@Component
public class TokenEventConsumer {

    @Bean
    Consumer<KafkaEventRecord> consumer() {
        return event -> {
            log.info("Kafka Event data consumed from Kafka {}", event);
        };
    }
}

@Configuration
@Slf4j
public class CloudStreamErrorHandler {

    @ServiceActivator(inputChannel = "errorChannel")
    public void handleError(ErrorMessage errorMessage) {
            log.error("Error Message is {}", errorMessage);
    }
}

@EmbeddedKafka(topics = {"batch-in"}, partitions = 3)
@TestPropertySource(properties = {"spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"})
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@ActiveProfiles("test")
@Slf4j
public class KafkaTokenConsumerTest {

    private static String TOPIC = "batch-in";

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private KafkaListenerEndpointRegistry endpointRegistry;

    @Autowired
    private ObjectMapper objectMapper;

    @SpyBean
    KafkaEventHandlerFactory kafkaEventHandlerFactory;

    @SpyBean
    CloudStreamErrorHandler cloudStreamErrorHandler;


    @BeforeEach
    void setUp() {
        for (MessageListenerContainer messageListenerContainer : endpointRegistry.getListenerContainers()) {
            ContainerTestUtils.waitForAssignment(messageListenerContainer, embeddedKafkaBroker.getPartitionsPerTopic());
        }
    }

    // THIS METHOD PASSES
    @Test
    public void rejectCorruptedMessage() throws ExecutionException, InterruptedException {

        kafkaTemplate.send(TOPIC, "{{{{").get(); // synchronous call

        CountDownLatch latch = new CountDownLatch(1);
        latch.await(5L, TimeUnit.SECONDS);

        // The frame works tries two times, no idea why
        verify(cloudStreamErrorHandler, times(2)).handleError(isA(ErrorMessage.class));

    }

    // THIS METHOD FAILS
    @Test
    public void rejectCorruptedMessage2() throws ExecutionException, InterruptedException {

        kafkaTemplate.send(TOPIC, "{}}}").get(); // synchronous call

        CountDownLatch latch = new CountDownLatch(1);
        latch.await(5L, TimeUnit.SECONDS);

        // The frame works tries two times, no idea why
        verify(cloudStreamErrorHandler, times(2)).handleError(isA(ErrorMessage.class));

    }
}
spring.cloud.stream.kafka.bindings.consumer-in-0.consumer.configuration.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.consumer-in-0.consumer.configuration.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.cloud.stream.kafka.bindings.consumer-in-0.consumer.configuration.key.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.cloud.stream.kafka.bindings.consumer-in-0.consumer.configuration.value.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer

// Producer only for testing purpose
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
rejectCorruptedMessage测试方法中的

json,触发handleError(ErrorMessage errorMessage)方法,这是预期的,因为它是无效的json。另一方面, rejectCorruptedMessage2 测试方法中的 json,触发 TokenEventConsumer 类中的 Consumer&lt;KafkaEventRecord&gt; consumer() 方法,这不是预期的行为,但是,我得到了具有空值的 KafkaEventRecord 对象。

【问题讨论】:

  • 您应该在生产者端使用StringSerializer,因为您发送的是原始json; JsonSerializer 会将有效负载转换为有效的 JSON 字符串。
  • 新配置如下:spring.deserializer.key.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer spring.deserializer.value.delegate.class=org.springframework.kafka .support.serializer.JsonDeserializer spring.json.value.default.type=com.batikan.api.v1.kafka.event.consumer.KafkaEventRecord spring.json.use.type.headers=false 但是问题还在继续。当我发送“{{{{”有效负载时,我收到 org.springframework.kafka.support.serializer.DeserializationException。但是,当我发送“{}}}”时,我收到了带有空字段的 KafkaEventRecord 对象。
  • 我看到你是新来的——不要把这样的东西放在 cmets 中;它渲染得不好;改为编辑问题并评论您已这样做。您误读了我的评论,我认为问题出在发送方。
  • 看起来 Jackson 只是忽略了尾随的 }} 并将 {} 解码为空对象。

标签: spring-boot apache-kafka spring-cloud-stream embedded-kafka


【解决方案1】:

Jackson 不认为这是无效的 JSON,它只是忽略了结尾的 }} 并将 {} 解码为空对象。

public class So67804599Application {

    public static void main(String[] args) throws Exception {
        ObjectMapper mapper = new ObjectMapper();
        JavaType type = mapper.constructType(Foo.class);
        Object foo = mapper.readerFor(Foo.class).readValue("{\"bar\":\"baz\"}");
        System.out.println(foo);
        foo = mapper.readerFor(Foo.class).readValue("{}}}");
        System.out.println(foo);
    }

    public static class Foo {

        String bar;

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}
Foo [bar=baz]
Foo [bar=null]

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-07-18
    • 2019-06-17
    • 1970-01-01
    • 2020-09-20
    • 2019-04-17
    • 2021-02-24
    • 2021-10-13
    • 2017-11-20
    相关资源
    最近更新 更多