编辑:此答案仅适用于旧版本的 spark 和 akka。 PH88 的答案是最新版本的正确方法。
您可以使用中间的akka.actor.Actor 来提供源(类似于this question)。下面的解决方案不是“反应式”的,因为底层 Actor 需要维护一个 RDD 消息缓冲区,如果下游 http 客户端没有足够快地消耗块,则可能会丢弃这些消息。但是无论实现细节如何,都会出现此问题,因为您无法将 akka 流背压的“节流”连接到 DStream 以减慢数据速度。这是因为 DStream 没有实现 org.reactivestreams.Publisher 。
基本拓扑是:
DStream --> Actor with buffer --> Source
要构建此拓扑,您必须创建一个类似于实现 here 的 Actor:
//JobManager definition is provided in the link
val actorRef = actorSystem actorOf JobManager.props
基于JobManager创建一个ByteStrings(消息)的流Source。此外,将 ByteString 转换为 HttpEntity.ChunkStreamPart,这是 HttpResponse 需要的:
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.HttpEntity
import akka.util.ByteString
type Message = ByteString
val messageToChunkPart =
Flow[Message].map(HttpEntity.ChunkStreamPart(_))
//Actor with buffer --> Source
val source : Source[HttpEntity.ChunkStreamPart, Unit] =
Source(ActorPublisher[Message](actorRef)) via messageToChunkPart
将 Spark DStream 链接到 Actor,以便将每个传入的 RDD 转换为 ByteString 的 Iterable,然后转发给 Actor:
import org.apache.spark.streaming.dstream.Dstream
import org.apache.spark.rdd.RDD
val dstream : DStream = ???
//This function converts your RDDs to messages being sent
//via the http response
def rddToMessages[T](rdd : RDD[T]) : Iterable[Message] = ???
def sendMessageToActor(message : Message) = actorRef ! message
//DStream --> Actor with buffer
dstream foreachRDD {rddToMessages(_) foreach sendMessageToActor}
为 HttpResponse 提供 Source:
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) =>
HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source))
}
注意:dstream foreachRDD 行和 HttpReponse 之间的时间/代码应该很少,因为在执行 foreach 行之后,Actor 的内部缓冲区将立即开始填充来自 DStream 的 ByteString 消息。