【问题标题】:Getting Started With Akka Streams开始使用 Akka 流
【发布时间】:2016-04-17 05:09:18
【问题描述】:

过去几天我一直在阅读有关 Akka Streams 的信息,过去几个月我一直在使用 Scala 中的 Rx 库。对我来说,这两个库所提供的内容似乎有些重叠。 RxScala 更容易上手、理解和使用。例如,这是一个简单的用例,我使用 Scala 的 Rx 库连接到 Kafka 主题,将其包装到 Observable 中,这样我就可以让订阅者收到这些消息。

val consumerStream = consumer.createMessageStreamsByFilter(topicFilter(topics), 1, keyDecoder, valueDecoder).head
val observableConsumer = Observable.fromIterator(consumerStream).map(_.message())

这是非常简单和整洁的。关于我应该如何开始使用 akka 流的任何线索?我想使用上面的相同示例,我想从源发出事件。稍后我将有一个 Flow 和一个 Sink。最后,在我的主类中,我将结合这 3 个来运行应用程序数据流。

有什么建议吗?

【问题讨论】:

    标签: scala rx-java akka-stream


    【解决方案1】:

    这就是我想出的:

    val kafkaStreamItr = consumer.createMessageStreamsByFilter(topicFilter(topics), 1, keyDecoder, valueDecoder).head
    Source.fromIterator(() => kafkaStreamItr).map(_.message)
    

    【讨论】:

      猜你喜欢
      • 2014-09-29
      • 1970-01-01
      • 1970-01-01
      • 2017-10-23
      • 1970-01-01
      • 1970-01-01
      • 2016-02-22
      • 1970-01-01
      • 2019-06-25
      相关资源
      最近更新 更多