【问题标题】:How to provide Kafka Streams properties via Spring Cloud Stream in YAML?如何通过 YAML 中的 Spring Cloud Stream 提供 Kafka Streams 属性?
【发布时间】:2020-04-27 06:42:53
【问题描述】:

我想将spring.kafka.streams.* 移动到spring.cloud.stream 下 - 这可能吗?我想到了streams-properties,类似于consumer-propertiesproducer-properties,但它不起作用。

spring:
  cloud:
    config:
      override-system-properties: false
      server:
        health:
          enabled: false
    stream:
      bindings:
        input_technischerplatz:
          destination: technischerplatz
        output_technischerplatz:
          destination: technischerplatz
      default:
        group: '${spring.application.name}'
        consumer:
          max-attempts: 5
      kafka:
        binder:
          auto-add-partitions: false
          auto-create-topics: false
          brokers: '${values.spring.kafka.bootstrap-servers}'
          configuration:
            header.mode: headers
          consumer-properties:
            allow.auto.create.topics: false
            auto.offset.reset: '${values.spring.kafka.consumer.auto-offset-reset}'
            enable.auto.commit: false
            isolation.level: read_committed
            max.poll.interval.ms: 300000
            max.poll.records: 100
            session.timeout.ms: 300000
          header-mapper-bean-name: defaultKafkaHeaderMapper
          producer-properties:
            acks: all
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            max.in.flight.requests.per.connection: 1
            max.block.ms: '${values.spring.kafka.producer.max-block-ms}'
            retries: 10
          required-acks: -1
  kafka:
    streams:
      applicationId: '${spring.application.name}_streams'
      properties:
        default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        default.timestamp.extractor: org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp
        state.dir: '${values.spring.kafka.streams.properties.state.dir}'

【问题讨论】:

    标签: apache-kafka apache-kafka-streams spring-cloud-stream spring-cloud-stream-binder-kafka


    【解决方案1】:

    您可以通过以下方式将流属性与spring.cloud.stream 绑定:

    spring.cloud.stream.kafka.streams.binder.applicationId: my-application-id
    spring.cloud.stream.kafka.streams.binder.configuration:
          default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
          default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    

    更多细节可以参考文档:

    https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.M3/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_streams_binder

    【讨论】:

    • 然后我收到以下错误:``` org.apache.kafka.common.errors.InconsistentGroupProtocolException: 组成员支持的协议与现有成员或第一个尝试加入的组成员的协议不兼容空协议类型或空协议列表。 ```
    • 看来您对 Streams 和 Consumer 使用了相同的 group.id。属于同一组的所有消费者必须声明一个共同的策略。如果一个消费者试图加入一个分配配置与其他组成员不一致的组,你最终会遇到这个异常。
    • 即使我删除 spring.cloud.stream.default.group 错误仍然存​​在。你有什么建议来解决这个问题?
    • 看起来可能是配置问题。确保在spring.cloud.stream.kafka.streams.binder.applicationId 下设置applicationId。从您的配置中不清楚它们是否对齐。如果您有多个 Kafka Streams 处理器,参考文档建议了几种设置应用程序 ID 的方法。
    猜你喜欢
    • 2021-10-14
    • 1970-01-01
    • 1970-01-01
    • 2018-12-29
    • 2017-11-24
    • 2019-06-24
    • 1970-01-01
    • 2023-04-05
    • 1970-01-01
    相关资源
    最近更新 更多