【问题标题】:Idiomatic way to use Spark DStream as Source for an Akka stream使用 Spark DStream 作为 Akka 流的源的惯用方式
【发布时间】:2016-01-27 17:52:52
【问题描述】:

我正在构建一个 REST API,它在 Spark 集群中开始一些计算,并以结果的分块流进行响应。给定带有计算结果的 Spark 流,我可以使用

dstream.foreachRDD()

从 Spark 发送数据。我正在使用 akka-http 发送分块的 HTTP 响应:

val requestHandler: HttpRequest => HttpResponse = {
  case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) =>
    HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source))
}

为简单起见,我先尝试使用纯文本,稍后再添加 JSON 编组。

但是,使用 Spark DStream 作为 Akka 流的源的惯用方式是什么?我想我应该可以通过套接字来实现,但由于 Spark 驱动程序和 REST 端点位于同一个 JVM 上,为此打开一个套接字似乎有点矫枉过正。

【问题讨论】:

    标签: scala spark-streaming akka-stream akka-http


    【解决方案1】:

    在提问时不确定 api 的版本。但是现在,有了 akka-stream 2.0.3,我相信你可以这样做:

    val source = Source
      .actorRef[T](/* buffer size */ 100, OverflowStrategy.dropHead)
      .mapMaterializedValue[Unit] { actorRef =>
        dstream.foreach(actorRef ! _)
      }
    

    【讨论】:

      【解决方案2】:

      编辑:此答案仅适用于旧版本的 spark 和 akka。 PH88 的答案是最新版本的正确方法。

      您可以使用中间的akka.actor.Actor 来提供源(类似于this question)。下面的解决方案不是“反应式”的,因为底层 Actor 需要维护一个 RDD 消息缓冲区,如果下游 http 客户端没有足够快地消耗块,则可能会丢弃这些消息。但是无论实现细节如何,都会出现此问题,因为您无法将 akka 流背压的“节流”连接到 DStream 以减慢数据速度。这是因为 DStream 没有实现 org.reactivestreams.Publisher

      基本拓扑是:

      DStream --> Actor with buffer --> Source
      

      要构建此拓扑,您必须创建一个类似于实现 here 的 Actor:

      //JobManager definition is provided in the link
      val actorRef = actorSystem actorOf JobManager.props 
      

      基于JobManager创建一个ByteStrings(消息)的流Source。此外,将 ByteString 转换为 HttpEntity.ChunkStreamPart,这是 HttpResponse 需要的:

      import akka.stream.actor.ActorPublisher
      import akka.stream.scaladsl.Source
      import akka.http.scaladsl.model.HttpEntity
      import akka.util.ByteString
      
      type Message = ByteString
      
      val messageToChunkPart = 
        Flow[Message].map(HttpEntity.ChunkStreamPart(_))
      
      //Actor with buffer --> Source
      val source : Source[HttpEntity.ChunkStreamPart, Unit] = 
        Source(ActorPublisher[Message](actorRef)) via messageToChunkPart
      

      将 Spark DStream 链接到 Actor,以便将每个传入的 RDD 转换为 ByteString 的 Iterable,然后转发给 Actor:

      import org.apache.spark.streaming.dstream.Dstream
      import org.apache.spark.rdd.RDD
      
      val dstream : DStream = ???
      
      //This function converts your RDDs to messages being sent
      //via the http response
      def rddToMessages[T](rdd : RDD[T]) : Iterable[Message] = ???
      
      def sendMessageToActor(message : Message) = actorRef ! message
      
      //DStream --> Actor with buffer
      dstream foreachRDD {rddToMessages(_) foreach sendMessageToActor}
      

      为 HttpResponse 提供 Source:

      val requestHandler: HttpRequest => HttpResponse = {
        case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) =>
          HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source))
      }
      

      注意:dstream foreachRDD 行和 HttpReponse 之间的时间/代码应该很少,因为在执行 foreach 行之后,Actor 的内部缓冲区将立即开始填充来自 DStream 的 ByteString 消息。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2015-11-10
        • 2019-04-03
        • 1970-01-01
        • 2022-06-10
        • 1970-01-01
        • 2017-05-12
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多