【问题标题】:Writing to a topic from a Processor in a Spring Cloud Streams Kafka Stream application从 Spring Cloud Streams Kafka Stream 应用程序中的处理器写入主题
【发布时间】:2020-05-02 11:14:16
【问题描述】:

我正在使用处理器 API 对状态存储进行一些低级处理。关键是我还需要在存储到商店后写入一个主题。如何在 Spring Cloud Streams Kafka 应用程序中完成?

@Bean
fun processEvent() = Consumer<KStream<EventId, EventValue>> { event ->

    event.map{
        ...
    }.process(ProcessorSupplier {

            object : Processor<EventId, MappedEventValue> {

                private lateinit var store: KeyValueStore<EventId, MappedEventValue>

                override fun init(context: ProcessorContext) {
                    store = context.getStateStore("event-store") as KeyValueStore<EventId, MappedEventValue>
                }

                override fun process(key: EventId, value: MappedEventValue) {
                    ...
                    store.put(key, processedMappedEventValue)

                    //TODO Write into a topic
                }
            }
    }
}  

【问题讨论】:

    标签: apache-kafka apache-kafka-streams spring-cloud-stream spring-cloud-stream-binder-kafka


    【解决方案1】:

    你不能。 process() 方法是一种终端操作,不允许您向下游发出数据。相反,您可以使用transform()(它与process() 基本相同,但允许您向下游发出数据);或取决于您的应用,transformValues()flatTransform() 等。

    使用transform() 可以返回KStream,可以写入主题。

    【讨论】:

    • 如答案中所述,使用transform 可以让您获得KStream,您可以将其发送到主题。在这种情况下,您可能希望使用 Function 作为返回类型。如果您有一个必须使用process() 方法的用例,那么我想您仍然可以直接使用Spring Kafka 中的KafkaTemplate 或使用Spring Cloud Stream 中的StreamBrdige API 发送到主题。但是,按照建议使用变压器要好得多。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-09-07
    • 2021-10-26
    • 1970-01-01
    • 1970-01-01
    • 2019-10-30
    • 1970-01-01
    相关资源
    最近更新 更多