【问题标题】:Spring Cloud Stream Kafka Stream not writing to destination topic after joinSpring Cloud Stream Kafka Stream 加入后未写入目标主题
【发布时间】:2019-09-07 09:25:06
【问题描述】:

这是我的应用程序,它仅从客户主题(输入绑定)和订单主题(订单绑定)获取对 KStream 的引用。然后它从客户主题创建一个 KTable 并与订单 KStream 执行连接:

@Configuration
class ShippingKStreamConfiguration {


    @StreamListener
    @SendTo("output")
    fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") order: KStream<Int, Order>): KStream<Int, OrderShipped> {

        val intSerde = Serdes.IntegerSerde()
        val customerSerde = JsonSerde<Customer>(Customer::class.java)
        val orderSerde = JsonSerde<Order>(Order::class.java)

        val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
                Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
                        .withKeySerde(intSerde)
                        .withValueSerde(customerSerde)

        val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
                .reduce({ _, y -> y }, stateStore)

        return (order.selectKey { key, value -> value.customerId } as KStream<Int, Order>)
                .join(customerTable, { orderIt, customer ->
                    OrderShipped(orderIt.id)
                },
                        Joined.with(intSerde, orderSerde, customerSerde))

    }

}

假设这应该写入一个输出绑定 (@SendTo("output")),指向一个 ordershipment 主题。但是,不会向该主题写入任何消息。

处理器配置:

interface ShippingKStreamProcessor {

    @Input("input")
    fun input(): KStream<Int, Customer>

    @Input("order")
    fun order(): KStream<String, Order>

    @Input("output")
    fun output(): KStream<String, OrderShipped>

}

**application.yml**

spring:
  application:
    name: spring-boot-shipping-service
  cloud:
    stream:
      kafka:
        streams:
          binder:
            configuration:
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
                value:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      bindings:
        input:
          destination: customer
          contentType: application/json
        order:
          destination: order
          contentType: application/json
        output:
          destination: ordershipments
          contentType: application/json

【问题讨论】:

    标签: apache-kafka spring-cloud apache-kafka-streams spring-cloud-stream


    【解决方案1】:

    处理器定义错误,这是使用@Output而不是@Input的好处理器:

    interface ShippingKStreamProcessor {
    
        @Input("input")
        fun input(): KStream<Int, Customer>
    
        @Input("order")
        fun order(): KStream<String, Order>
    
        @Output("output")
        fun output(): KStream<String, OrderShipped>
    
    }
    

    【讨论】:

      猜你喜欢
      • 2021-10-13
      • 1970-01-01
      • 2022-01-06
      • 2019-09-21
      • 1970-01-01
      • 1970-01-01
      • 2018-04-28
      • 2019-04-17
      • 2017-11-20
      相关资源
      最近更新 更多