【问题标题】:How to create discrete streams in fs2 Functional Stream for Scala?如何在 Scala 的 fs2 功能流中创建离散流?
【发布时间】:2017-08-04 17:08:57
【问题描述】:

是否可以在 fs2 中创建离散事件流?如果是这样怎么做。 我刚开始玩图书馆,我知道我有很多东西要学习。但我没有看到任何相关的例子。例如我想在 scalajsswing 中为“mousemove”或“click”创建一个流。 我正在寻找类似 RxJS 的东西,我可以使用 Rx.Observable.create 来创建离散事件,例如:

//note: pseudo code
var mouse = Rx.Observable.create( subscriber => {
     document.body.addEventListener("mousemove", event =>{
      subscriber.onNext(event)
 })
} ) 

fs2 中的等价物可能不是那么微不足道,但如果有人能建议我怎么做的话。我猜它会使用 HandlerPull/Push 数据类型,但我不知道如何使用。

干杯。

【问题讨论】:

    标签: scala functional-programming stream system.reactive fs2


    【解决方案1】:

    这是我想出的一个示例,它演示了如何将 fs2 与 JavaFX 一起使用:

    import cats.implicits._
    import cats.effect._
    import cats.effect.implicits._
    import javafx.application.{Application, Platform}
    import javafx.scene.{Node, Scene}
    import javafx.scene.layout._
    import javafx.stage.Stage
    import fs2._
    import fs2.concurrent._
    import javafx.beans.value.WritableValue
    import javafx.scene.control.{Label, TextField}
    import javafx.scene.input.KeyEvent
    
    import scala.concurrent.ExecutionContext
    
    import scala.util.Try
    
    class Fs2Ui extends Application {
      override def start(primaryStage: Stage): Unit = {
        implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
        implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
    
        new Logic[IO]().run(primaryStage).start.unsafeRunSync()
      }
    
      class Logic[F[_]: ConcurrentEffect: ContextShift: Timer] {
        import Fs2Ui._
        import java.time.{Duration, Instant}
        import java.util.concurrent.TimeUnit.MILLISECONDS
    
        def run(primaryStage: Stage): F[Unit] = for {
          v <- initializeUi(primaryStage)
          View(input, feedback) = v
    
          _ <- Stream(input).covary[F]
            .through(typedChars)
            .through(processInput)
            .through(displayFeedback(feedback.textProperty))
            .compile.drain
        } yield ()
    
        private def initializeUi(primaryStage: Stage): F[View] = updateUi {
          val input = new TextField()
          input.setPrefWidth(300)
          val feedback = new Label("...")
    
          val vbox = new VBox(input, feedback)
          val root = new StackPane(vbox)
          val scene = new Scene(root)
    
          primaryStage.setScene(scene)
          primaryStage.show()
    
          View(input, feedback)
        }
    
        private def processInput: Pipe[F, TypedChar, Feedback] = for {
          typed <- _
          _ <- Stream.eval(ContextShift[F].shift)
          res <- Stream.eval { time(processSingle(typed)) }
          (d, Feedback(str)) = res
        } yield Feedback(s"$str in [$d]")
    
        private def displayFeedback(value: WritableValue[String]): Pipe[F, Feedback, Unit] =
          _.map { case Feedback(str) => str } through updateValue(value)
    
        private def time[A](f: F[A]): F[(Duration, A)] = {
          val now = Timer[F].clock.monotonic(MILLISECONDS).map(Instant.ofEpochMilli)
          for {
            start <- now
            a <- f
            stop <- now
            d = Duration.between(start, stop)
          } yield (d, a)
        }
    
        private val processSingle: TypedChar => F[Feedback] = {
          import scala.util.Random
          import scala.concurrent.duration._
    
          val prng = new Random()
          def randomDelay: F[Unit] = Timer[F].sleep { (250 + prng.nextInt(750)).millis }
    
          c => randomDelay *> Sync[F].delay(Feedback(s"processed $c"))
        }
      }
    }
    
    object Fs2Ui {
      case class View(input: TextField, feedback: Label)
    
      case class TypedChar(value: String)
      case class Feedback(value: String)
    
      private def typedChars[F[_]: ConcurrentEffect]: Pipe[F, Node, TypedChar] = for {
        node <- _
        q <- Stream.eval(Queue.unbounded[F, KeyEvent])
        _ <- Stream.eval(Sync[F].delay {
          node.setOnKeyTyped { evt => (q enqueue1 evt).toIO.unsafeRunSync() }
        })
        keyEvent <- q.dequeue
      } yield TypedChar(keyEvent.getCharacter)
    
      private def updateValue[F[_]: Async, A](value: WritableValue[A]): Pipe[F, A, Unit] = for {
        a <- _
        _ <- Stream.eval(updateUi(value setValue a))
      } yield ()
    
      private def updateUi[F[_]: Async, A](action: => A): F[A] =
        Async[F].async[A] { cb =>
          Platform.runLater { () =>
            cb(Try(action).toEither)
          }
        }
    }
    

    演示 fs2 和 JavaFX 之间绑定的具体部分是两个 Pipes:typedCharsupdateValue。就我个人而言,我认为最具挑战性的部分是使 KeyEvent 侦听器看起来像 fs2 Stream 事件:

    node.setOnKeyTyped { evt => (q enqueue1 evt).toIO.unsafeRunSync() }
    

    【讨论】:

    • 这很有趣,我会尝试一下,看看它是否符合我的需要。干杯@Andrey
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-07-04
    • 1970-01-01
    • 2020-10-10
    • 1970-01-01
    • 2014-01-22
    • 1970-01-01
    • 2019-06-14
    相关资源
    最近更新 更多