【问题标题】:Actors Mailbox Overflow. Scala演员邮箱溢出。斯卡拉
【发布时间】:2011-03-10 14:18:14
【问题描述】:

我目前正在与 scala 中的两个演员合作。一,生产者,产生一些数据并将其发送到parcer。生产者通过消息发送HashMap[String,HashMap[Object,List[Int]]](连同this标记发送者):

parcer ! (this,data)

解析器一直在等待这样的消息:

def act(){
    loop{
      react{
        case (producer, data)=> parse(data);
      }
    }
}

程序在正常情况下完美运行。问题来自大量数据和发送的许多消息(哈希大约有 10^4 个元素,内部哈希大约有 100 个元素,列表长度为 100),程序崩溃。它没有显示错误或异常。它只是停止。

问题似乎是我的生产者的工作速度比解析器快得多(目前我不想要多个解析器)。

阅读scala mailbox size limit 后,我想知道我的解析器邮箱是否已达到极限。该帖子还提供了一些解决方案,但我首先需要确保这是问题所在。我该如何测试?

有没有办法知道演员的内存限制?那么读取邮箱中已用/空闲内存呢?

也欢迎您对尚未在that link 中发布的工作流程提出任何建议。

谢谢,

【问题讨论】:

标签: memory scala memory-management actor


【解决方案1】:

首先,您不需要显式传递发送者,因为无论如何发送者都会被 Scala 演员框架跟踪。您始终可以使用sender 方法访问消息的发件人。

从这里可以看出:scala.actors.MQueue,演员的邮箱被实现为一个链表,因此只受堆大小的限制。

不过,如果您担心生产者非常快而消费者非常慢,我建议您探索一种节流机制。但我不推荐问题scala mailbox size limit接受的答案中的方法。

一般来说,在系统压力过大时尝试发送过载消息似乎不是一个好主意。如果您的系统太忙而无法检查过载怎么办?如果过载消息的接收者太忙而无法对其采取行动怎么办?此外,删除消息对我来说听起来不是一个好主意。我认为您希望可靠地处理所有工作项。

另外,我不会依赖mailboxSize 来确定负载。您无法区分不同的消息类型,只能从消费者自身内部检查,而不能从生产者检查。

我建议使用一种方法,让消费者在知道自己可以处理的情况下要求更多的工作。

下面是一个如何实现的简单示例。

import scala.actors._
import Actor._

object ConsumerProducer {
  def main(args: Array[String]) {
    val producer = new Producer(Iterator.range(0, 10000))
    val consumer = new Consumer(producer)
  }
}

case class Produce(count: Int)
case object Finished

class Producer[T](source: Iterator[T]) extends Actor {

  start

  def act() {
    loopWhile(source.hasNext) {
      react {
        case Produce(n: Int) => produce(n)
      } 
    }
  }

  def produce(n: Int) {
    println("producing " + n)
    var remaining = n
    source takeWhile(_ => remaining > 0) foreach { x => sender ! x; remaining -= 1 }
    if(!source.hasNext) sender ! Finished
  }
}

class Consumer(producer: Actor) extends Actor {

  start

  private var remaining = 0

  def act() {
    requestWork()
    consume()
  }

  def consume(): Nothing = react {
    case Finished => println("Finished")
    case n: Int => work(n); requestWork(); consume()
  }

  def requestWork() = if(remaining < 5) { remaining += 10; producer ! Produce(10) }

  def work(n: Int) = {
    println(n + ": " + (0 until 10000).foldLeft(0) { (acc, x) => acc + x * n })
    remaining -= 1
  }
}

【讨论】:

  • 您好 Ruediger,感谢您的回答。所以邮箱没有像我想的那样实现!顺便说一句,您如何访问消息的发件人?
  • 在你的actor里面你有sender方法。它总是返回收到的最后一条消息的发送者。我在我给出的示例中的Producer actor的produce方法中使用了它。
猜你喜欢
  • 1970-01-01
  • 2010-10-30
  • 2019-11-24
  • 1970-01-01
  • 2011-09-16
  • 1970-01-01
  • 2016-01-31
  • 2017-10-23
  • 2013-05-19
相关资源
最近更新 更多