【发布时间】:2019-09-07 09:25:06
【问题描述】:
这是我的应用程序,它仅从客户主题(输入绑定)和订单主题(订单绑定)获取对 KStream 的引用。然后它从客户主题创建一个 KTable 并与订单 KStream 执行连接:
@Configuration
class ShippingKStreamConfiguration {
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") order: KStream<Int, Order>): KStream<Int, OrderShipped> {
val intSerde = Serdes.IntegerSerde()
val customerSerde = JsonSerde<Customer>(Customer::class.java)
val orderSerde = JsonSerde<Order>(Order::class.java)
val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
.withKeySerde(intSerde)
.withValueSerde(customerSerde)
val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
.reduce({ _, y -> y }, stateStore)
return (order.selectKey { key, value -> value.customerId } as KStream<Int, Order>)
.join(customerTable, { orderIt, customer ->
OrderShipped(orderIt.id)
},
Joined.with(intSerde, orderSerde, customerSerde))
}
}
假设这应该写入一个输出绑定 (@SendTo("output")),指向一个 ordershipment 主题。但是,不会向该主题写入任何消息。
处理器配置:
interface ShippingKStreamProcessor {
@Input("input")
fun input(): KStream<Int, Customer>
@Input("order")
fun order(): KStream<String, Order>
@Input("output")
fun output(): KStream<String, OrderShipped>
}
**application.yml**
spring:
application:
name: spring-boot-shipping-service
cloud:
stream:
kafka:
streams:
binder:
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
value:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
input:
destination: customer
contentType: application/json
order:
destination: order
contentType: application/json
output:
destination: ordershipments
contentType: application/json
【问题讨论】:
标签: apache-kafka spring-cloud apache-kafka-streams spring-cloud-stream