【问题标题】:Spring Cloud Stream [2021.0.5] Kafka Batch mode Avro native encoding doesn't work with spring cloud sleuthSpring Cloud Stream [2021.0.5] Kafka Batch 模式 Avro 本机编码不适用于 spring cloud sleuth
【发布时间】:2023-02-12 16:46:49
【问题描述】:

我正在努力将 spring boot 升级到 2.7.8,将 spring cloud 升级到 2021.0.5。

我有 Spring 云流 kafka 消费者在批处理模式下使用 avro 反序列化,我试图使用 useNativeEncoding according to documentation

问题是当使用 Message<List> 的输入时,spring 云流代码在此类 SimpleFunctionRegistry 中覆盖(使用侦探时)本机编码标志为 false,此消息有效负载为空。

不使用 Message> 它工作正常,即列表。

在花了一天多的时间在不了解原因的情况下尝试调试问题后,我将它带到一个副项目中进行测试,但在使用 sleuth 后它停止了工作。

错误

问题是方法private FunctionInvocationWrapper wrapInAroundAdviceIfNecessary(FunctionInvocationWrapper function)上的类SimpleFunctionRegistry,它调用应用并覆盖标志

spring cloud stream 团队有什么解决方法吗?还是一个简单的修复?

应用程序.yaml 示例

spring:
  cloud:
    stream:
      binders:
        kafka-string-avro-native:
          type: kafka
          defaultCandidate: true
          environment.spring.cloud.stream.kafka.binder.consumerProperties:
            dlqProducerProperties.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
            dlqProducerProperties.configuration.value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            schema.registry.url: ${SCHEMA_REGISTRY_URL:http://0.0.0.0:55013}
            specific.avro.reader: true
            useNativeDecoding: true

      bindings:
        revenueEventConsumer-in-0:
          binder: kafka-string-avro-native
          destination: email.campaign_revenue_events
          group: test-4
          consumer:
            concurrency: 1
            batch-mode: true
            use-native-decoding: true
      function:
        definition: revenueEventConsumer
      kafka:
        binder:
          brokers: 0.0.0.0:55008

【问题讨论】:

    标签: spring-boot spring-cloud-stream spring-cloud-sleuth spring-cloud-stream-binder-kafka


    【解决方案1】:

    我通过覆盖 Bean TraceFunctionAroundWrapper 和覆盖 setSkipInputConversion(true) 找到了解决该问题的方法

    见下面的代码

    @Bean
        @Primary
        TraceFunctionAroundWrapper customTraceFunctionAroundWrapper(Environment environment, Tracer tracer, Propagator propagator,
                                                              Propagator.Setter<MessageHeaderAccessor> injector, Propagator.Getter<MessageHeaderAccessor> extractor,
                                                              ObjectProvider<List<FunctionMessageSpanCustomizer>> customizers) {
            return new CustomTraceFunctionAroundWrapper(environment, tracer, propagator, injector, extractor,
                customizers.getIfAvailable(ArrayList::new));
        }
    
    public class CustomTraceFunctionAroundWrapper extends TraceFunctionAroundWrapper {
        public CustomTraceFunctionAroundWrapper(Environment environment, Tracer tracer,
                                                Propagator propagator,
                                                Propagator.Setter<MessageHeaderAccessor> injector,
                                                Propagator.Getter<MessageHeaderAccessor> extractor) {
            super(environment, tracer, propagator, injector, extractor);
        }
    
        public CustomTraceFunctionAroundWrapper(Environment environment, Tracer tracer, Propagator propagator, Propagator.Setter<MessageHeaderAccessor> injector,
                                                Propagator.Getter<MessageHeaderAccessor> extractor,
                                                List<FunctionMessageSpanCustomizer> customizers) {
            super(environment, tracer, propagator, injector, extractor, customizers);
        }
    
        @Override
        protected Object doApply(Object message, SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) {
            targetFunction.setSkipInputConversion(true);
            return super.doApply(message, targetFunction);
        }
    }
    

    这只是一个解决方法,直到错误被修复是 spring cloud stream 和 sleuth

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-04-28
      • 2017-06-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-03-09
      • 2020-01-01
      相关资源
      最近更新 更多