【发布时间】:2019-05-02 09:56:36
【问题描述】:
我有一个外部(即我无法更改)Java API,如下所示:
public interface Sender {
void send(Event e);
}
我需要实现一个Sender,它接受每个事件,将其转换为 JSON 对象,将其中一些事件收集到一个包中,然后通过 HTTP 发送到某个端点。这一切都应该异步完成,send() 不会阻塞调用线程,使用一些固定大小的缓冲区并在缓冲区已满时丢弃新事件。
使用 akka-streams 这很简单:我创建了一个阶段图(它使用 akka-http 发送 HTTP 请求),将其具体化并使用具体化的ActorRef 将新事件推送到流中:
lazy val eventPipeline = Source.actorRef[Event](Int.MaxValue, OverflowStrategy.fail)
.via(CustomBuffer(bufferSize)) // buffer all events
.groupedWithin(batchSize, flushDuration) // group events into chunks
.map(toBundle) // convert each chunk into a JSON message
.mapAsyncUnordered(1)(sendHttpRequest) // send an HTTP request
.toMat(Sink.foreach { response =>
// print HTTP response for debugging
})(Keep.both)
lazy val (eventsActor, completeFuture) = eventPipeline.run()
override def send(e: Event): Unit = {
eventsActor ! e
}
这里的CustomBuffer 是一个自定义的GraphStage,它与库提供的Buffer 非常相似,但根据我们的特定需求量身定制;对于这个特定的问题,它可能无关紧要。
如您所见,从非流代码与流交互非常简单 - ActorRef 特征上的 ! 方法是异步的,不需要调用任何额外的机制。然后通过整个反应管道处理发送给参与者的每个事件。此外,由于 akka-http 的实现方式,我什至可以免费获得连接池,因此与服务器打开的连接不超过一个。
但是,我找不到正确使用 FS2 执行相同操作的方法。即使放弃缓冲的问题(我可能需要编写一个自定义的Pipe 实现,它会做我们需要的其他事情)和 HTTP 连接池,我仍然坚持一个更基本的事情——即如何推送数据“从外部”传输到反应流。
我能找到的所有教程和文档都假设整个程序发生在某个效果上下文中,通常是IO。这不是我的情况 - Java 库在未指定的时间调用 send() 方法。因此,我不能将所有内容都保存在一个 IO 操作中,我必须在 send() 方法中完成“推送”操作,并将反应流作为一个单独的实体,因为我想聚合事件并希望池HTTP 连接(我相信它自然与反应流相关联)。
我假设我需要一些额外的数据结构,比如Queue。 fs2 确实有某种fs2.concurrent.Queue,但同样,所有文档都显示了如何在单个IO 上下文中使用它,所以我假设做类似的事情
val queue: Queue[IO, Event] = Queue.unbounded[IO, Event].unsafeRunSync()
然后在流定义中使用queue,然后在send()方法中单独使用unsafeRun调用:
val eventPipeline = queue.dequeue
.through(customBuffer(bufferSize))
.groupWithin(batchSize, flushDuration)
.map(toBundle)
.mapAsyncUnordered(1)(sendRequest)
.evalTap(response => ...)
.compile
.drain
eventPipeline.unsafeRunAsync(...) // or something
override def send(e: Event) {
queue.enqueue(e).unsafeRunSync()
}
不是正确的方法,而且很可能根本行不通。
那么,我的问题是,如何正确使用 fs2 来解决我的问题?
【问题讨论】:
标签: scala akka-stream reactive-streams fs2