【问题标题】:Spring cloud stream Confluent KStream Avro ConsumeSpring Cloud Stream Confluent KStream Avro 消费
【发布时间】:2020-03-20 16:53:47
【问题描述】:

我正在尝试使用 Spring Boot 2.0 将来自 kafka 主题的融合 avro 消息作为 Kstream 使用。

我能够以MessageChannel 的身份使用该消息,但不能以KStream 的身份使用。

@Input(ORGANIZATION)
KStream<String, Organization> organizationMessageChannel();


@StreamListener
    public void processOrganization(@Input(KstreamBinding.ORGANIZATION)KStream<String, Organization> organization) {
        log.info("Organization Received:" + organization);
 }

例外:

线程异常 “pcs-7bb7b444-044d-41bb-945d-450c902337ff-StreamThread-3” org.apache.kafka.streams.errors.StreamsException:流线程 [pcs-7bb7b444-044d-41bb-945d-450c902337ff-StreamThread-3] 失败 再平衡。在 org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:860) 在 org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808) 在 org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) 在 org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) 引起:org.apache.kafka.streams.errors.StreamsException:失败 配置值 serde 类 io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde 在 org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:859) 在 org.apache.kafka.streams.processor.internals.AbstractProcessorContext.(AbstractProcessorContext.java:59) 在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.(ProcessorContextImpl.java:42) 在 org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:134) 在 org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404) 在 org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365) 在 org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350) 在 org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137) 在 org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88) 在 org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259) 在 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264) 在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) 在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) 在 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) 在 org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) 在 org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) 在 org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851) ... 3 更多原因:io.confluent.common.config.ConfigException: 缺少所需的配置“schema.registry.url”,其中没有 默认值。在 io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243) 在 io.confluent.common.config.AbstractConfig.(AbstractConfig.java:78) 在 io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.(AbstractKafkaAvroSerDeConfig.java:61) 在 io.confluent.kafka.serializers.KafkaAvroSerializerConfig.(KafkaAvroSerializerConfig.java:32) 在 io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48) 在 io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.configure(SpecificAvroSerializer.java:58) 在 io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.configure(SpecificAvroSerde.java:107) 在 org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:855) ... 19 更多

基于错误,我认为我缺少配置 schema.registry.url 以进行融合。 我快速浏览了示例here 关于如何使用 streamListener 对 Spring Cloud Stream 执行相同操作,有点迷失了

这需要单独配置吗?或者有没有办法在 application.yml 本身中配置 schema.registry.url confluent 正在寻找?

这里是代码仓库https://github.com/naveenpop/springboot-kstream-confluent

Organization.avsc

{
   "namespace":"com.test.demo.avro",
   "type":"record",
   "name":"Organization",
   "fields":[
      {
         "name":"orgId",
         "type":"string",
         "default":"null"
      },
      {
         "name":"orgName",
         "type":"string",
         "default":"null"
      },
      {
         "name":"orgType",
         "type":"string",
         "default":"null"
      },
      {
         "name":"parentOrgId",
         "type":"string",
         "default":"null"
      }
   ]
}

DemokstreamApplication.java

@SpringBootApplication
@EnableSchemaRegistryClient
@Slf4j
public class DemokstreamApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemokstreamApplication.class, args);
    }

    @Component
    public  static class organizationProducer implements ApplicationRunner {

        @Autowired
        private KafkaProducer kafkaProducer;

        @Override
        public void run(ApplicationArguments args) throws Exception {
            log.info("Starting: Run method");
            List<String> names = Arrays.asList("blue", "red", "green", "black", "white");
            List<String> pages = Arrays.asList("whiskey", "wine", "rum", "jin", "beer");
            Runnable runnable = () -> {
                String rPage = pages.get(new Random().nextInt(pages.size()));
                String rName = names.get(new Random().nextInt(names.size()));

                try {
                    this.kafkaProducer.produceOrganization(rPage, rName, "PARENT", "111");
                } catch (Exception e) {
                    log.info("Exception :" +e);
                }
            };
            Executors.newScheduledThreadPool(1).scheduleAtFixedRate(runnable ,1 ,1, TimeUnit.SECONDS);
        }
    }
}

KafkaConfig.java

@Configuration
public class KafkaConfig {

    @Value("${spring.cloud.stream.schemaRegistryClient.endpoint}")
    private String endpoint;

    @Bean
    public SchemaRegistryClient confluentSchemaRegistryClient() {
        ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
        client.setEndpoint(endpoint);
        return client;
    }

}

KafkaConsumer.java

@Slf4j
@EnableBinding(KstreamBinding.class)
public class KafkaConsumer {

   @StreamListener
    public void processOrganization(@Input(KstreamBinding.ORGANIZATION_INPUT) KStream<String, Organization> organization) {
        organization.foreach((s, organization1) -> log.info("KStream Organization Received:" + organization1));
    }
}

KafkaProducer.java

@EnableBinding(KstreamBinding.class)
public class KafkaProducer {

    @Autowired
    private KstreamBinding kstreamBinding;

    public void produceOrganization(String orgId, String orgName, String orgType, String parentOrgId) {

        try {
            Organization organization = Organization.newBuilder()
                    .setOrgId(orgId)
                    .setOrgName(orgName)
                    .setOrgType(orgType)
                    .setParentOrgId(parentOrgId)
                    .build();

            kstreamBinding.organizationOutputMessageChannel()
                            .send(MessageBuilder.withPayload(organization)
                            .setHeader(KafkaHeaders.MESSAGE_KEY, orgName)
                            .build());

        } catch (Exception e){
            log.error("Failed to produce Organization Message:" +e);
        }
    }
}

KstreamBinding.java

public interface KstreamBinding {

    String ORGANIZATION_INPUT= "organizationInput";
    String ORGANIZATION_OUTPUT= "organizationOutput";

    @Input(ORGANIZATION_INPUT)
    KStream<String, Organization> organizationInputMessageChannel();

    @Output(ORGANIZATION_OUTPUT)
    MessageChannel organizationOutputMessageChannel();
}

更新 1:

我应用了 dturanski here 的建议,错误消失了。但是仍然无法将消息作为KStream&lt;String, Organization&gt; 使用控制台中没有错误。

更新 2:

应用了来自 sobychacko here 的建议,并且该消息可用于对象中的空值。

我在 GitHub 示例中创建了一个 commit,以从 Spring Boot 本身生成消息,并且仍然将其作为空值获取。

感谢您在此问题上的宝贵时间。

【问题讨论】:

    标签: spring-boot apache-kafka apache-kafka-streams spring-cloud-stream confluent-schema-registry


    【解决方案1】:

    以下实现不会按照您的意图进行:

    @StreamListener
        public void processOrganization(@Input(KstreamBinding.ORGANIZATION)KStream<String, Organization> organization) {
            log.info("Organization Received:" + organization);
     }
    

    该日志语句仅在引导阶段调用一次。为了使它工作,您需要对收到的KStream 调用一些操作,然后在那里提供逻辑。例如以下作品我在 foreach 方法调用上提供 lambda 表达式。

     @StreamListener
        public void processOrganization(@Input(KstreamBinding.ORGANIZATION) KStream<String, Organization> organization) {
    
            organization.foreach((s, organization1) -> log.info("Organization Received:" + organization1));
        }
    

    您在配置中还有一个问题,您错误地将 avro Serde 分配给实际上是 String 的键。改成这样:

    default:
                    key:
                      serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                    value:
                      serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
    

    通过这些更改,我每次向主题发送内容时都会收到日志记录。但是,您发送 groovy 脚本时存在问题,我没有从您的 Organization 域中获取任何实际数据,但我会让您弄清楚这一点。

    关于空 Organization 域对象的问题的更新

    发生这种情况是因为您正在使用混合模式的序列化策略。您在生产者端使用 Spring Cloud Stream 的 avro 消息转换器,但在 Kafka Streams 处理器上使用 Confluent avro Serdes。我刚刚尝试了从生产者到处理器的所有方式的 Confluent 序列化程序,我能够在出站时看到 Organization 域。这是为了使序列化一致而修改的配置。

    spring:
      application:
        name: kstream
      cloud:
        stream:
          schemaRegistryClient:
            endpoint: http://localhost:8081
          schema:
            avro:
              schema-locations: classpath:avro/Organization.avsc
          bindings:
            organizationInput:
              destination: organization-updates
              group: demokstream.org
              consumer:
                useNativeDecoding: true
            organizationOutput:
              destination: organization-updates
              producer:
                useNativeEncoding: true
          kafka:
            bindings:
              organizationOutput:
                producer:
                  configuration:
                    key.serializer: org.apache.kafka.common.serialization.StringSerializer
                    value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                    schema.registry.url: http://localhost:8081
            streams:
              binder:
                brokers: localhost
                configuration:
                  schema.registry.url: http://localhost:8081
                  commit:
                    interval:
                      ms: 1000
                  default:
                    key:
                      serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                    value:
                      serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
    
    

    您还可以从主应用程序类中删除KafkaConfig 类以及EnableSchemaRegistryClient 注释。

    【讨论】:

    • 感谢您回复此问题。我认为空对象问题与 groovy 无关,它似乎在其他地方。我尝试使用相同的 GitHub 示例从 spring boot 本身生成消息并获取相同的空对象。这是提交github.com/naveenpop/springboot-kstream-confluent/commit/…
    • 我更新了答案。请尝试进行这些更改,看看是否能解决您的问题。
    • 感谢您的更新。是的,提供的更新解决方案有效。使用以下属性有效。 spring.cloud.stream.bindings.organizationInput.consumer.useNativeDecoding = true spring.cloud.stream.bindings.organizationOutput.producer.useNativeEncoding = true
    【解决方案2】:

    试试spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url: ...

    【讨论】:

    • 谢谢 dturanski。我看到错误消失了。但是,我无法以KStream&lt;String, Object&gt; 的身份使用这些消息,并且没有报告错误。
    • 您能否将该示例简化为只有一个处理器?以及一些重现问题的说明。我还想知道您是否可以升级到最新版本的活页夹 (3.0.0.RELEASE),然后再试一次。
    • 我已将示例应用程序简化为只有一个处理器,并在此处添加了自述文件中的说明github.com/naveenpop/springboot-kstream-confluent/blob/master/… 另外,正如您所建议的,我尝试升级到最新的3.0.0.RELEASE 并收到失败introspect 类在同一篇文章中捕捉到了这一点。
    • 当你说升级到3.0.0.RELEASE 我想我可能也需要升级spring boot,不是吗?只是检查这是否不可能让它在 Spring boot 2.0.0.RELEASE 本身中工作。谢谢!
    猜你喜欢
    • 2017-02-17
    • 2018-03-09
    • 2019-02-05
    • 2018-01-31
    • 1970-01-01
    • 2019-01-11
    • 2021-12-30
    • 2019-07-17
    • 2018-05-17
    相关资源
    最近更新 更多