【问题标题】:How cancel akka stream within play framework conroller如何在播放框架控制器中取消 akka 流
【发布时间】:2018-07-02 18:14:49
【问题描述】:

我有一个流,应该从 http api 控制(开始,停止,只有一个实例)。响应应流式传输到客户端。这里带有播放框架控制器的代码:

  class Processor{

    def job(): Source[Int, NotUsed] ={
      stop()
      Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure)
    }

    def stop(): Unit ={
      //TODO
    }
  }

  class MyController(process: Processor) {

    def startJob = Action {
      val source = process.job()
      Ok.chunked(source)
    }

    def cancell = Action {
      process.cancel()
      Ok("canceled")
    }
  }

我需要取消工作的能力。当客户端关闭连接时,作业不应取消 - 就像日志输出一样。我读到了KillSwitches,但不明白如何将它与接受Source 的播放控制器一起使用。有什么帮助吗?

我想我需要一些输出源,不同于 Job 源。

【问题讨论】:

    标签: playframework akka-stream


    【解决方案1】:

    我使用Monix Observable 执行我的任务。通过操作,我可以运行、取消和连接到正在运行的流。无论如何,我对用于教育目的的 akka-stream 解决方案感兴趣。这里monix解决方案:

    class StreamService(implicit ec: Scheduler) {
    
      private val runningStream: AtomicAny[Option[RunningStream]] = AtomicAny(None)
    
      def run(): Option[Source[ByteString, NotUsed]] =
        runningStream.get match {
          case None =>
            val observable = Observable
              .interval(1.seconds)
              .map(_.toString)
              .doOnTerminate(cb => runningStream.set(None))
              .doOnSubscriptionCancel(() => runningStream.set(None))
              .publish
    
            val cancelable_ = observable.connect()
    
            this.runningStream.set(Some(RunningStream(cancelable_, observable)))
            connect()
          case _ => None
        }
    
      def connect(): Option[Source[ByteString, NotUsed]] =
        runningStream.get
          .map(rs => rs.observable.toReactivePublisher)
          .map(publisher => Source.fromPublisher(publisher).map(ByteString(_)))
    
      def cancel(): Unit =
        runningStream.get.foreach(_.cancelable.cancel())
    
    }
    
    object StreamService {
      case class RunningStream(cancelable: Cancelable, observable: ConnectableObservable[String])
    }
    
    
    class SomeController @Inject()(streamService: StreamService, cc: ControllerComponents)
      extends AbstractController(cc) {
    
      def run() = Action {
        val source = streamService.run().getOrElse(throw new RuntimeException("Stream already running"))
        Ok.chunked(source)
      }
    
      def connect() = Action {
        val source = streamService.connect().getOrElse(throw new RuntimeException("Stream not running"))
        Ok.chunked(source)
      }
    
      def cancel() = Action {
        streamService.cancel()
        Ok("ok")
      }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-08-21
      相关资源
      最近更新 更多