【发布时间】:2018-07-09 02:19:36
【问题描述】:
我想让 flink 消费者流中的每条消息产生多条消息,每条消息都通过一个单独的线程,使用 flink kafka 生产者发送到 kafka 中的某个主题。我用 Scala 编写程序,但用 Java 回答就可以了
类似这样的:
def thread(x:String): Thread =
{
val thread_ = new Thread {
override def run {
val str = some_processing(x)
flink_producer(str)
}
}
return thread_
}
val stream = flink_consumer()
stream.map(x =>{
var i = 0
while(i < 10){
val th = thread(x)
th.start()
i = i+1
}
})
因此,对于 flink 消费者中的每个输入,我想使用多线程向其他队列生成 10 条消息。
【问题讨论】:
标签: apache-kafka apache-flink flink-streaming