【问题标题】:How to minimize memory usage in akka streams如何最小化akka流中的内存使用
【发布时间】:2020-02-27 10:43:47
【问题描述】:

我有一个流,它会在某些时候将对象分组以创建文件。 我想我可以通过在流的早期序列化对象来压缩一些字节。 但我最大的问题是如何优化这样的流的内存占用:

val sourceOfCustomer = Source.repeat(Customer(name = "test"))
def serializeCustomer(customer: Customer) = customer.toString

sourceOfCustomers
.via(serializeCustomer) // 1KB
.grouped(1000000) // 1GB
.via(processFile) // 1GB
.via(moreProcessing) // 1GB
.via(evenMoreProcessing) // 1GB
.to(fileSink) // 1GB

这使我在稳定状态下的内存使用量至少 5GB。这是正确的吗?

我可以使用什么策略将其仅限制为 1 或 2GB?原则上应该可以通过折叠操作符来实现。

注意:我知道一个解决方案是使组更小,但让我们考虑组的大小是问题的约束。

【问题讨论】:

  • 将每个计算输入/输出分解为更细粒度的部分。
  • 请提供sourceOfCustomersserializeCustomer 的类型以及他们在做什么的简短描述。谢谢!
  • @IvanKurchenko 他们可以是任何东西,但我用他们的描述更新了问题

标签: scala akka akka-stream


【解决方案1】:

抱歉,我可能遗漏了一些东西,但我没有在最新的 Akka Stream 文档中找到 group 操作,我猜你的意思是 grouped 操作:https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/grouped.html

如果是这样,那么这意味着在.grouped(1000000) // 1GB,您在流中创建了一组元素,这些元素可以同时处理,因此一个多于一组 1GB 大小的元素可以在一瞬间出现在内存中。因此,为了将流中的内存占用限制在 1GB 以内,您可以采用以下两种方法之一:

1) 减少同时处理的大型组的数量。 这可以通过throttle 操作来实现:https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/throttle.html#throttle 请以代码 sn-p 为例

import scala.concurrent.duration._
...

.group(1000000) // 1GB
.throttle(1, 1 minute)

2) 减少大组大小

val parallelismLevel = Runtime.getRuntime.availableProcessors() // or another custom level which represents stream processing parallelism
val baseGroupSize = 1000000 // 1GB
val groupSize =  baseGroupSize / parallelismLevel 

sourceOfCustomers
.via(serializeCustomer) // 1KB
.group(groupSize)

希望这会有所帮助!

【讨论】:

  • 谢谢,我不认为这是一个解决方案,但它绝对是一种解决方法。我猜你不想在解决方案中牺牲速度。文件的大小也是问题的一个约束。
  • @gurghet 对不起,我不知道文件大小有限制。
  • @gurghet 好吧,我不想牺牲速度 - 但从我的角度来看,这是memory-performance 平衡问题的示例。如果您想在内存中处理 1G 文件并且您根本不想使用超过 1G 的内存,您将需要一次处理 1 个文件。还是我错过了什么?
  • 我不想把它变成一个对话,但是你是对的。只是指出有更好的解决方案。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-04-03
  • 2023-03-26
  • 2011-06-06
  • 1970-01-01
  • 2012-04-29
相关资源
最近更新 更多