【问题标题】:How to 'Chunk and Re-assmble' large messages in Reactive Kafka using Akka-Stream如何使用 Akka-Stream 在 Reactive Kafka 中“分块和重新组合”大消息
【发布时间】:2017-02-09 15:02:02
【问题描述】:

使用 Kafka 发送大文件时,是否可以跨分区分发,然后使用 Akka-Stream 重新组装?如本演示文稿所述:

http://www.slideshare.net/JiangjieQin/handle-large-messages-in-apache-kafka-58692297

【问题讨论】:

    标签: scala apache-kafka akka-stream reactive-kafka


    【解决方案1】:

    “分块”方面,即生产者,很容易使用 reactive kafka 之类的东西编写:

    case class LargeMessage(bytes : Seq[Byte], topic : String)
    
    def messageToKafka(message : LargeMessage, maxMessageSize : Int) = 
      Source.fromIterator(() => message.bytes.toIterator)
            .via(Flow[Byte].grouped(maxMessageSize))
            .via(Flow[Seq[Byte]].map(seq => new ProducerRecord(message.topic, seq)))
            .runWith(Producer.plainSink(producerSettings)
    

    “重新组装”,即消费者,可以用类似于the documentation的方式实现:

       val messageFut : Future[LargeMessage] = 
         for {
           bytes <- Consumer.map(_._1).runWith(Sink.seq[Byte])
         } yield LargeMessage(bytes, topic)
    

    【讨论】:

    • 这是否保证消息的顺序,因为不保证跨分区的顺序?
    猜你喜欢
    • 2021-07-09
    • 2018-02-21
    • 1970-01-01
    • 2018-02-16
    • 1970-01-01
    • 1970-01-01
    • 2019-03-15
    • 2018-01-19
    • 2017-04-29
    相关资源
    最近更新 更多