【问题标题】:Running the Flink consumer with Flink producer in loop within the Consumer stream using threads使用线程在消费者流中循环运行 Flink 消费者和 Flink 生产者
【发布时间】:2018-07-09 02:19:36
【问题描述】:

我想让 flink 消费者流中的每条消息产生多条消息,每条消息都通过一个单独的线程,使用 flink kafka 生产者发送到 kafka 中的某个主题。我用 Scala 编写程序,但用 Java 回答就可以了

类似这样的:

def thread(x:String): Thread =
    {   
    val thread_ = new Thread {
          override def run {

              val str = some_processing(x)
              flink_producer(str)

      }
    }
    return thread_
 }

val stream = flink_consumer()

stream.map(x =>{

                 var i = 0
                 while(i < 10){

                                val th = thread(x)
                                th.start()
                                i = i+1
                                }

           })

因此,对于 flink 消费者中的每个输入,我想使用多线程向其他队列生成 10 条消息。

【问题讨论】:

    标签: apache-kafka apache-flink flink-streaming


    【解决方案1】:

    大多数 Flink 算子都是并行算子,因此您没有理由在数据管道中创建任何类型的线程,Flink 应该是管理算子可以存在多少并行实例以及是否要设置该值,您应该使用以下 API 方法。

    .setParallelism(N) //N is 10 for you,

    你可以得到more info in Fink documentation

    你应该这样做:

    1. 为您的集群配置添加更多任务管理器插槽
    2. 使用生成 10 条消息的 flatMap 代替地图
    3. 将 flatMap 运算符的并行度增加到 10。

    您的代码应如下所示:

    val stream = flink_consumer()
    
    stream.flatMap((x, out) =>{
                     var i = 0
                     while(i < 10){
                          val valueToCollect = process(x,i)
                          out.collect(valueToCollect)
                     }
    
               }).setParallelism(10)
               .map(doSomethingWithGeneratedValues)
               .addSink(sinkThatSendsDataToYourDesiredSystem)
    

    另一种方法,如果你知道你想要多少并行任务

    val stream = flink_consumer()
    
    val resultStream = stream.map(process)
    val sinkStream = resultStream.union(resultStream,resultStream,resultStream,...) // joins resultStream N times
    sinkStream.addSink(sinkThatSendsDataToYourDesiredSystem)
    

    最后,您还可以为一个 DataStream 设置多个接收器

    val stream = flink_consumer()
    
    val resultStream = stream.map(process)
    resultStream.addSink(sinkThatSendsDataToYourDesiredSystem)
    resultStream.addSink(sinkThatSendsDataToYourDesiredSystem)
    resultStream.addSink(sinkThatSendsDataToYourDesiredSystem)
    ...
    N
    ...
    resultStream.addSink(sinkThatSendsDataToYourDesiredSystem)
    

    如果您想对数据接收器进行并行写入,则必须确保您使用的接收器支持这种写入操作。

    【讨论】:

    • 感谢您的回答。根据我的理解,平面图将获取一条消费消息并在 process(x,i) 的帮助下生成 10 条唯一消息,然后使用 addink 向 kafka 发送 10 条消息。我将从一条消息中获得 Kafka 队列中的 10 条消息.那么这里的地图有什么用?
    • 是的,你是对的,我使用 flatMap 是因为我认为你想为每个线程生成一条新消息,如果你只想并行处理数据,一个简单的地图就可以了; )
    • 好的,我需要一个平面图,但有没有办法用并行迭代器替换 while 循环?
    • 您可以使用 scala 并行迭代器,但这会在 Flink 任务中创建线程,这看起来不是一个好习惯。因为,如果您的节点中有与 CPU 核心一样多的任务槽,那么创建大量线程会产生大量上下文切换,因此最终它会比正常的 while 循环慢。无论如何,我从未对这个主题做过基准测试,所以我不能 100% 确定
    • 另一个好的方法,可能是向地图操作员添加多个接收器。我要更新我的答案
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-08-26
    • 2013-11-10
    • 1970-01-01
    • 1970-01-01
    • 2018-10-15
    • 2018-09-24
    • 2017-02-01
    相关资源
    最近更新 更多