【问题标题】:Pushing elements externally to a reactive stream in fs2将元素从外部推送到 fs2 中的反应流
【发布时间】:2019-05-02 09:56:36
【问题描述】:

我有一个外部(即我无法更改)Java API,如下所示:

public interface Sender {
    void send(Event e);
}

我需要实现一个Sender,它接受每个事件,将其转换为 JSON 对象,将其中一些事件收集到一个包中,然后通过 HTTP 发送到某个端点。这一切都应该异步完成,send() 不会阻塞调用线程,使用一些固定大小的缓冲区并在缓冲区已满时丢弃新事件。

使用 akka-streams 这很简单:我创建了一个阶段图(它使用 akka-http 发送 HTTP 请求),将其具体化并使用具体化的ActorRef 将新事件推送到流中:

lazy val eventPipeline = Source.actorRef[Event](Int.MaxValue, OverflowStrategy.fail)
  .via(CustomBuffer(bufferSize))  // buffer all events
  .groupedWithin(batchSize, flushDuration)  // group events into chunks
  .map(toBundle)  // convert each chunk into a JSON message
  .mapAsyncUnordered(1)(sendHttpRequest)  // send an HTTP request
  .toMat(Sink.foreach { response =>
    // print HTTP response for debugging
  })(Keep.both)

lazy val (eventsActor, completeFuture) = eventPipeline.run()

override def send(e: Event): Unit = {
  eventsActor ! e
}

这里的CustomBuffer 是一个自定义的GraphStage,它与库提供的Buffer 非常相似,但根据我们的特定需求量身定制;对于这个特定的问题,它可能无关紧要。

如您所见,从非流代码与流交互非常简单 - ActorRef 特征上的 ! 方法是异步的,不需要调用任何额外的机制。然后通过整个反应管道处理发送给参与者的每个事件。此外,由于 akka-http 的实现方式,我什至可以免费获得连接池,因此与服务器打开的连接不超过一个。

但是,我找不到正确使用 FS2 执行相同操作的方法。即使放弃缓冲的问题(我可能需要编写一个自定义的Pipe 实现,它会做我们需要的其他事情)和 HTTP 连接池,我仍然坚持一个更基本的事情——即如何推送数据“从外部”传输到反应流。

我能找到的所有教程和文档都假设整个程序发生在某个效果上下文中,通常是IO。这不是我的情况 - Java 库在未指定的时间调用 send() 方法。因此,我不能将所有内容都保存在一个 IO 操作中,我必须在 send() 方法中完成“推送”操作,并将反应流作为一个单独的实体,因为我想聚合事件并希望池HTTP 连接(我相信它自然与反应流相关联)。

我假设我需要一些额外的数据结构,比如Queue。 fs2 确实有某种fs2.concurrent.Queue,但同样,所有文档都显示了如何在单个IO 上下文中使用它,所以我假设做类似的事情

val queue: Queue[IO, Event] = Queue.unbounded[IO, Event].unsafeRunSync()

然后在流定义中使用queue,然后在send()方法中单独使用unsafeRun调用:

val eventPipeline = queue.dequeue
  .through(customBuffer(bufferSize))
  .groupWithin(batchSize, flushDuration)
  .map(toBundle)
  .mapAsyncUnordered(1)(sendRequest)
  .evalTap(response => ...)
  .compile
  .drain

eventPipeline.unsafeRunAsync(...)  // or something

override def send(e: Event) {
  queue.enqueue(e).unsafeRunSync()
}

不是正确的方法,而且很可能根本行不通。

那么,我的问题是,如何正确使用 fs2 来解决我的问题?

【问题讨论】:

    标签: scala akka-stream reactive-streams fs2


    【解决方案1】:

    我对那个库没有太多经验,但它应该看起来像这样:

    import cats.effect.{ExitCode, IO, IOApp}
    import fs2.concurrent.Queue
    
    case class Event(id: Int)
    
    class JavaProducer{
    
      new Thread(new Runnable {
        override def run(): Unit = {
          var id = 0
          while(true){
            Thread.sleep(1000)
            id += 1
            send(Event(id))
          }
        }
      }).start()
    
      def send(event: Event): Unit ={
        println(s"Original producer prints $event")
      }
    }
    
    class HackedProducer(queue: Queue[IO, Event]) extends JavaProducer {
      override def send(event: Event): Unit = {
        println(s"Hacked producer pushes $event")
        queue.enqueue1(event).unsafeRunSync()
        println(s"Hacked producer pushes $event - Pushed")
      }
    
    }
    
    object Test extends IOApp{
      override def run(args: List[String]): IO[ExitCode] = {
        val x: IO[Unit] = for {
          queue <- Queue.unbounded[IO, Event]
          _ = new HackedProducer(queue)
          done <- queue.dequeue.map(ev => {
            println(s"Got $ev")
          }).compile.drain
        } yield done
        x.map(_ => ExitCode.Success)
      }
    
    }
    

    【讨论】:

    • 这正是我正在谈论的问题 - 我没有任何生产者可以在单独的线程中进行初始化。我正在与之交互的 Java 库是这个生产者,我无法控制它的实例化方式。
    • 现在我遇到了你的问题。如果您在队列初始化后不调用unsafeRunAsync,那么您的初始解决方案可能会起作用。只有当你推动元素并结束时。
    • 如果我不在eventPipeline 上运行unsafeRunAsync(实际上,在我当前的实现中,我将使用unsafeRunCancellable,因为由于API 要求,我需要一种方法来关闭此管道) 那么流将如何开始呢?或者,如果您的意思是不运行Queue.unbounded[..].unsafeRunSync(),那么我如何获取队列实例?无论如何,我目前正在尝试。
    • 是的,我的意思是不运行Queue.unbounded[..].unsafeRunSync()。示例在我的回答中。一旦你开始for,你就进入了 IO monad。
    • 那我的情况还是没有解决:) 在这种情况下,如何在send() 方法中获取queue 的实例?如果我在send() 中运行queue.unsafeRunSync(),那么我将在每次send() 调用期间获得一个单独的Queue 实例,它不会以任何方式与事件管道连接。
    【解决方案2】:

    考虑以下示例:

    import cats.implicits._
    import cats.effect._
    import cats.effect.implicits._
    import fs2._
    import fs2.concurrent.Queue
    
    import scala.concurrent.ExecutionContext
    import scala.concurrent.duration._
    
    object Answer {
      type Event = String
    
      trait Sender {
        def send(event: Event): Unit
      }
    
      def main(args: Array[String]): Unit = {
        val sender: Sender = {
          val ec = ExecutionContext.global
          implicit val cs: ContextShift[IO] = IO.contextShift(ec)
          implicit val timer: Timer[IO] = IO.timer(ec)
    
          fs2Sender[IO](2)
        }
    
        val events = List("a", "b", "c", "d")
        events.foreach { evt => new Thread(() => sender.send(evt)).start() }
        Thread sleep 3000
      }
    
      def fs2Sender[F[_]: Timer : ContextShift](maxBufferedSize: Int)(implicit F: ConcurrentEffect[F]): Sender = {
        // dummy impl
        // this is where the actual logic for batching
        //   and shipping over the network would live
        val consume: Pipe[F, Event, Unit] = _.evalMap { event =>
          for {
            _ <- F.delay { println(s"consuming [$event]...") }
            _ <- Timer[F].sleep(1.seconds)
            _ <- F.delay { println(s"...[$event] consumed") }
          } yield ()
        }
    
        val suspended = for {
          q <- Queue.bounded[F, Event](maxBufferedSize)
          _ <- q.dequeue.through(consume).compile.drain.start
          sender <- F.delay[Sender] { evt =>
            val enqueue = for {
              wasEnqueued <- q.offer1(evt)
              _ <- F.delay { println(s"[$evt] enqueued? $wasEnqueued") }
            } yield ()
            enqueue.toIO.unsafeRunAsyncAndForget()
          }
        } yield sender
    
        suspended.toIO.unsafeRunSync()
      }
    }
    

    主要思想是使用来自 fs2 的并发队列。注意,上面的代码表明Sender接口和main中的逻辑都不能改变。只能换出Sender 接口的实现。

    【讨论】:

    • 这几乎就是我最终要做的事情,也是我最初在问题中描述的我认为行不通的事情。它确实有效,所以我相信这确实是正确的方法。
    【解决方案3】:

    我们可以创建一个有界队列,它将消耗来自发送方的元素,并使它们可用于 fs2 流处理。

    
    import cats.effect.IO
    import cats.effect.std.Queue
    
    import fs2.Stream
    
    trait Sender[T]:
        def send(e: T): Unit
    
    object Sender:
         def apply[T](bufferSize: Int): IO[(Sender[T], Stream[IO, T])] =
             for
                 q <- Queue.bounded[IO, T](bufferSize)
             yield
                 val sender: Sender[T] = (e: T) => q.offer(e).unsafeRunSync()
                 def stm: Stream[IO, T] = Stream.eval(q.take) ++ stm
                 (sender, stm)
    

    然后我们将有两个目的 - 一个用于 Java 世界,将新元素发送到 Sender。另一个 - 用于 fs2 中的流处理。

    class TestSenderQueue:
    
        @Test def testSenderQueue: Unit =
            val (sender, stream) = Sender[Int](1)
              .unsafeRunSync()// we have to run it preliminary to make `sender` available to external system
            
            val processing = 
                stream
                    .map(i => i * i)
                    .evalMap{ ii => IO{ println(ii)}}
            sender.send(1)
                    
            processing.compile.toList.start//NB! we start processing in a separate fiber
                .unsafeRunSync() // immediately right now.
            sender.send(2)
            Thread.sleep(100)
            (0 until 100).foreach(sender.send)
            println("finished")
    

    请注意,我们在当前线程中推送数据,并且必须在单独的线程中运行 fs2 (.start)。

    【讨论】:

    • 这几乎就是我最终做的事情,以及其他答案中描述的内容,以及我最初在答案中描述为我认为不起作用的内容:) 但显然似乎是正确的方法,它确实有效。还是谢谢)
    猜你喜欢
    • 2023-03-22
    • 2021-06-24
    • 2018-03-13
    • 2022-12-22
    • 1970-01-01
    • 2021-07-29
    • 1970-01-01
    • 2023-03-26
    • 2015-10-16
    相关资源
    最近更新 更多