【问题标题】:using streams vs actors for periodic tasks使用流与参与者进行周期性任务
【发布时间】:2020-07-13 04:09:26
【问题描述】:

我正在使用 akka/scala/play 堆栈。

通常,我使用流来执行某些任务。例如,我有一个每分钟唤醒一次的流,从数据库中获取一些内容,然后调用另一个服务来使用 API 丰富其数据并将丰富的内容保存到数据库中。

类似这样的:

class FetcherAndSaveStream @Inject()(fetcherAndSaveGraph: FetcherAndSaveGraph, dbElementsSource: DbElementsSource)
                                     (implicit val mat: Materializer,
                                      implicit val exec: ExecutionContext) extends LazyLogging {

  def graph[M1, M2](source: Source[BDElement, M1],
                    sink: Sink[BDElement, M2],
                    switch: SharedKillSwitch): RunnableGraph[(M1, M2)] = {

    val fetchAndSaveDataFromExternalService: Flow[BDElement, BDElement, NotUsed] =
      fetcherAndSaveGraph.fetchEndSaveEnrichment

    source.viaMat(switch.flow)(Keep.left)
      .via(fetchAndSaveDataFromExternalService)
      .toMat(sink)(Keep.both).withAttributes(supervisionStrategy(resumingDecider))

  }


  def runGraph(switchSharedKill: SharedKillSwitch): (NotUsed, Future[Done]) = {
    logger.info("FetcherAndSaveStream is now running")
    graph(dbElementsSource.dbElements(), Sink.ignore, switchSharedKill).run()
  }
}

我想知道,这是否比只使用一个每分钟都在滴答作响的演员做类似的事情更好?为此使用演员和流之间的比较是什么?

试图弄清楚我什么时候应该选择哪种方法(流/演员)。谢谢!!

【问题讨论】:

标签: scala playframework akka actor akka-stream


【解决方案1】:

您可以同时使用这两种方法,具体取决于您对解决方案的要求,这些要求未在此处列出。您需要考虑的一般问题 - 演员比流更多低级的东西,所以他们需要更多的代码和调试。

基本上,流适用于需要处理大量数据且内存消耗较低的任务。使用流,您不需要在每个n 秒开始流式传输,您可以将此流设置为与应用程序一起运行。通过省略调度程序逻辑,这可以使您的代码更加简洁。 我将省略你的 DI 和架构的东西,用伪代码编写解决方案:

val yourConsumer: Sink[YourDBRecord] = ???
val recordsSource: Source[YourDBRecord] = 
val runnableGraph = (Source repeat ())
  .throttle(1, n seconds)
  .mapAsync(yourParallelism){_ =>
    fetchReasonableAmountOfRecordsFromDB
  } mapConcat identity to yourConsumer

这个流会做你的事情。您甚至可以使用更复杂的逻辑来增强它,以使用图形 API 中的反馈循环根据工作负载调整轮询率。此外,您可以添加需要在流崩溃的地方恢复的错误处理策略。

此外,DBS 的 alpakka 连接器能够做到这一点,您可以查看那里的解决方案是否符合您的目的,或检查实施细节。

这样做您可以获得什么 - 背压、处理流的能力、干净简洁的代码,无需您直接管理定时自动机。 https://doc.akka.io/docs/akka/current/stream/stream-rate.html

你也可以创建一个actor,但是你应该手动完成akka流为你做的所有事情,即背压,以防你想与流、调度程序、分块和内存管理进行互操作(不加载一批100000个左右的条目到内存中)等等。

【讨论】:

  • 我实际上是在应用程序启动时运行图表。而且我的流源有 tickInterval ,我可以设置为我想要的任何内容,并且我将其设置为 1 分钟
  • 您的应用似乎有一个滴答间隔来运行流,因为流本身没有轮询逻辑。建议的解决方案将该逻辑放入流中,从而减少整体代码量。
  • Source.tick(0 seconds, n seconds, ())(Source repeat ()).throttle(1, n seconds) 更清晰、更高效。另外,tick 的物化器为您提供了一种取消整个事情的好方法。
猜你喜欢
  • 1970-01-01
  • 2016-02-04
  • 2014-10-28
  • 1970-01-01
  • 2021-02-05
  • 2013-06-08
  • 1970-01-01
  • 2019-05-25
  • 1970-01-01
相关资源
最近更新 更多