【问题标题】:How do I hook up scalaz-streams to reactive streams (as in reactive-streams.org)如何将 scalaz-streams 连接到反应流(如在 reactive-streams.org 中)
【发布时间】:2016-04-27 22:26:22
【问题描述】:

我想通过 db.stream(yourquery) 通过 scalaz-stream 流式传输从 3.0.0 查询返回的数据。

看起来 reactive-streams.org 使用了不同库实现的 API 和数据流模型。

如果背压从 scalaz-stream 流程回流到 slick 发布者,您如何做到这一点?

【问题讨论】:

  • 您可以将问题概括为:如何将 scalaz-streams 连接到响应式流(如在 reactive-streams.org 中)

标签: slick scalaz-stream


【解决方案1】:

看看https://github.com/krasserm/streamz

Streamz 是 scalaz-stream 的资源组合库。它允许 Process 实例消费和生产到:

  • Apache Camel 端点
  • Akka 持久性日志和快照存储和
  • 具有完整背压支持的 Akka Stream 流(反应性流)

【讨论】:

    【解决方案2】:

    我终于回答了我自己的问题。如果您愿意使用 scalaz-streams 队列来排队流结果。

    def getData[T](publisher: slick.backend.DatabasePublisher[T],
      queue: scalaz.stream.async.mutable.Queue[T], batchRequest: Int = 1): Task[scala.concurrent.Future[Long]] =
      Task {
        val p = scala.concurrent.Promise[Unit]()
        var counter: Long = 0
        val s = new org.reactivestreams.Subscriber[T] {
          var sub: Subscription = _
    
          def onSubscribe(s: Subscription): Unit = {
            sub = s
            sub.request(batchRequest)
          }
    
          def onComplete(): Unit = {
            sub.cancel()
            p.success(counter)
          }
    
          def onError(t: Throwable): Unit = p.failure(t)
    
          def onNext(e: T): Unit = {
            counter += 1
            queue.enqueueOne(e).run
            sub.request(batchRequest)
          }
        }
        publisher.subscribe(s)
        p.future
      }
    

    当您使用run 运行此操作时,您将获得一个完成后表示查询完成流式传输的未来。如果您希望您的计算等待所有数据到达,您可以在这个未来上进行创作。您还可以在 getData 的任务中添加使用 Await,然后如果您需要在继续之前运行所有数据,则在返回的任务对象上组合您的计算。对于我所做的事情,我在未来的完成中撰写并关闭队列,以便我的 scalaz-stream 知道干净地终止。

    【讨论】:

      【解决方案3】:

      这是一个稍微不同的实现(与 user1763729 发布的那个),它返回一个进程:

      def getData[T](publisher: DatabasePublisher[T], batchSize: Long = 1L): Process[Task, T] = {
       val q = async.boundedQueue[T](10)
      
       val subscribe = Task.delay {
        publisher.subscribe(new Subscriber[T] {
      
          @volatile var subscription: Subscription = _
      
          override def onSubscribe(s: Subscription) {
            subscription = s
            subscription.request(batchSize)
          }
      
          override def onNext(next: T) = {
              q.enqueueOne(next).attemptRun
              subscription.request(batchSize)
          }
      
          override def onError(t: Throwable) = q.fail(t).attemptRun
      
          override def onComplete() = q.close.attemptRun
        })
       }
      
       Process.eval(subscribe).flatMap(_ => q.dequeue)
      }
      

      【讨论】:

        猜你喜欢
        • 2013-12-29
        • 1970-01-01
        • 2023-03-26
        • 2016-05-30
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-11-19
        相关资源
        最近更新 更多