【问题标题】:How do Actor Systems function to prevent memory overflow from queues but also prevent threads blocking on writing on the queues?Actor Systems 如何防止队列中的内存溢出,同时防止线程阻塞写入队列?
【发布时间】:2020-04-28 11:33:41
【问题描述】:

Actor 相互发送消息。如果队列是有限的,那么在尝试写入/发送到满队列时会发生什么?阻塞还是丢弃?如果它们不受限制,则可能会发生内存崩溃。多少可配置?

【问题讨论】:

    标签: erlang akka message-queue distributed-computing actor


    【解决方案1】:

    Akka 中的默认邮箱没有限制,因此不会防止内存崩溃。但是,您可以将参与者配置为使用不同的邮箱,其中既有邮箱在达到最大大小时丢弃(传递给死信)消息,也有阻止(我不建议使用这些)的邮箱。您可以在此处的文档中找到 Akka 附带的所有邮箱实现:https://doc.akka.io/docs/akka/current/typed/mailboxes.html#mailbox-implementations

    【讨论】:

    • 确实如此。除非应用程序逻辑这样做,否则 ActorSystems 不能被背压,但是 Reactive Streams 是背压应用程序的方式,例如 AkkaStreams,它在后台使用 Akka ActorSystems。
    【解决方案2】:

    您可以轻松测试 Erlang VM 在这种情况下的行为。在外壳中:

    F = fun F() -> receive done -> ok end end,
    P = spawn(F),
    G = fun G(Pid,Size,Wait) -> Pid ! lists:seq(1,Size), receive done -> ok after Wait -> G(Pid,Size,Wait) end end,
    H = fun(Pid,Size,Wait) -> T = fun() -> G(Pid,Size,Wait) end, spawn(T) end,
    D = fun D() -> io:format("~p~n~p~n",[erlang:time(),erlang:memory(processes_used)]), receive done -> ok after 10000 -> D() end end,
    P1 = spawn(D).
    
    P2 = H(P,100000,5).
    

    您将看到您收到内存分配异常,VM 写入核心转储并崩溃。

    我没有检查如何修改限制,如果你尝试一下,你会看到它需要达到非常高的邮件数量,使用邮箱中的数十 GB 内存。

    如果你遇到这种情况,我不认为第一反应是增加尺寸,你应该先寻找

    • 未读消息,
    • 进程瓶颈
    • 应用架构
    • Erlang 是否适合您的问题
    • ...

    【讨论】:

      【解决方案3】:

      erlang中的actor队列没有限制,这受VM的内存大小限制,如果VM中的内存大小已满VM崩溃。对于监控或管理内存分配和 CPU 负载,您可以在 Erlang 中使用 os_mon

      你可以在 erlang shell 中测试

      F = fun() -> timer:sleep(60000),
                   {message_queue_len, InboxLen} = erlang:process_info(self(), message_queue_len),
                    io:format("Len ===> ~p", [InboxLen]) 
          end.
      PID = erlang:spawn(F).
      [PID ! "hi" || _ <- lists:seq(1, 50000)].
      

      如果你增加消息的数量,你可能会溢出内存

      【讨论】:

        【解决方案4】:

        Akka 中的默认邮箱没有限制。但是如果你想限制邮箱中的最大消息,你可以在actor中构建一个Akka流,然后可以按需使用OverflowStrategy。

        例如:

        val source: Source[Message, SourceQueueWithComplete[Message]] =
            Source.queue[Message](bufferSize = 8192,
              overflowStrategy = OverflowStrategy.dropNew)
        
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2014-02-12
          • 2010-11-27
          • 2016-04-16
          • 2011-11-06
          • 1970-01-01
          相关资源
          最近更新 更多