【问题标题】:Scala actors left hangingScala 演员悬而未决
【发布时间】:2012-01-11 04:37:33
【问题描述】:

我正在生成少量参与者来获取、处理 RSS 提要项目并将其保存到数据库中。这是通过在 cron 上运行的对象的 main 方法完成的。我创建这些演员并在他们完成分配给他们的先前工作时分配工作给他们。我的主要课程产生了一个演员,将工作分配给一群演员。最终,主要方法似乎挂起。它没有退出,但所有演员的执行都停止了。我的 CTO 认为主角在演员完成他们的工作并离开他们之前就退出了,但我不相信是这样的。我在 main 上没有收到成功退出(根本没有退出)。

基本上我想知道如何调试这些演员,以及什么可能的原因会导致这种情况发生。在演员完成执行之前主退出(如果确实如此,那有关系吗?)据我所知,使用接收的演员被一对一映射到线程,对吗?代码如下。请提出任何后续问题,非常感谢您的帮助。我知道我可能没有提供足够的细节,我是 scala 和演员的新手,会根据需要更新。

object ActorTester {
  val poolSize = 10
  var pendingQueue :Set[RssFeed] = RssFeed.pendingQueue

  def main(args :Array[String]) {
    val manager = new SpinnerManager(poolSize, pendingQueue)
    manager.start
  }
}

case object Stop

class SpinnerManager(poolSize :Int = 1, var pendingQueue :Set[RssFeed]) extends Actor {
  val pool = new Array[Spinner](poolSize)

  override def start() :Actor = {
    for (i <- 0 to (poolSize - 1)) {
      val spinner = new Spinner(i)
      spinner.start()
      pool(i) = spinner
    }
    super.start
  }

  def act() {
    for {
      s <- pool
      if (!pendingQueue.isEmpty)
     } {
       s ! pendingQueue.head
       pendingQueue = pendingQueue.tail
     }

    while(true) {
      receive {
        case id :Int => {
          if (!pendingQueue.isEmpty) {
            pool(id) ! pendingQueue.head
            pendingQueue = pendingQueue.tail             
          } else if ((true /: pool) { (done, s) => {
            if (s.getState != Actor.State.Runnable) {
              val exited = future {
                s ! Stop
                done && true
              }
              exited()
            } else {
              done && false
            }
          }}) {
            exit
          }
        } 
      }
    }
  }
}

class Spinner(id :Int) extends Actor {
  def act() {
    while(true) {
      receive {
        case dbFeed :RssFeed => {
          //process rss feed
          //this has multiple network requests, to the original blogs, bing image api
          //our instance of solr - some of these spawn their own actors
          sender ! id
        }
        case Stop => exit
      }
    }
  }
}

【问题讨论】:

    标签: scala concurrency jvm actor


    【解决方案1】:

    一方面,当您向左折叠以确定所有 Spinner 演员是否已“终止”时,您犯了一个微小但重要的错误。你应该做的是评估done &amp;&amp; true resp。 done &amp;&amp; false 在 if 案例的末尾,但目前你只说 true resp。 false 不考虑done

    例如,假设有 4 个 Spinner actor,其中第一个和第二个是 Runnable,第三个 不是,第四个又是 Runnable。在这种情况下,您 foldleft 的结果将是true,尽管第三个演员尚未完成。如果您使用的是逻辑 &amp;&amp;,您会得到正确的结果。

    这也可能是导致您的应用程序挂起的原因。

    编辑:还有一个竞争条件的问题。以下代码现在可以使用,希望对您有所帮助。无论如何,我想知道,Scala 的 actor 实现不会自动使用工作线程吗?

    import actors.Actor
    import scala.collection.mutable.Queue
    
    case class RssFeed()
    
    case class Stop()
    
    class Spinner(id: Int) extends Actor {
      def act() {
        loop {
          react {
            case dbFeed: RssFeed => {
              // Process RSS feed
              sender ! id
            }
            case Stop => exit()
          }
        }
      }
    }
    
    class SpinnerManager(poolSize: Int, pendingQueue: Queue[RssFeed]) extends Actor {
      val pool = Array.tabulate(poolSize)(new Spinner(_).start())
    
      def act() {
        for (s <- pool; if (!pendingQueue.isEmpty)) {
          pendingQueue.synchronized {
            s ! pendingQueue.dequeue()
          }
        }
    
        loop {
          react {
            case id: Int => pendingQueue.synchronized {
              if (!pendingQueue.isEmpty) {
                Console println id
                pool(id) ! pendingQueue.dequeue()
              } else {
                if (pool forall (_.getState != Actor.State.Runnable)) {
                  pool foreach (_ ! Stop)
                  exit()
                }
              }
            }
          }
        }
      }
    
    }
    
    object ActorTester {
      def main(args: Array[String]) {
        val poolSize = 10
        val pendingQueue: Queue[RssFeed] = Queue.tabulate(100)(_ => RssFeed())
        new SpinnerManager(poolSize, pendingQueue).start()
      }
    }
    

    【讨论】:

    • 仍然挂起。不过还是谢谢。
    • 谢谢,我会试试这个。是的,它确实使用了工作线程。有什么意义?
    • 我只是在想,也许你正在实施一些已经存在的东西。另一方面,我没有经常使用演员,因此我在这里的“建议”实际上可能是一个糟糕的建议。但是,从我目前的观点来看,我会坚持使用 Spinner,而不使用 SpinnerManager。根据 Scala 主页上的this tutorial,您可能不必担心工作线程。最后由你决定:)
    【解决方案2】:

    所以经过几天的调试,我已经解决了这个问题。 fotNelton 的代码建议对此非常有帮助,所以我给了他一票。然而,他们并没有解决问题本身。我发现如果你在 main 方法中运行它,那么如果父演员在他们的子演员之前退出,那么程序将永远挂起并且永远不会退出,仍然保留它的所有内存。在处理 RSS 提要的过程中,Fetcher 会生成演员并向他们发送消息以执行涉及网络请求的事情。这些演员需要在父演员退出之前完成他们的工作。不过,Fetcher 不会等待这些演员完成,一旦他发送了消息,他就会继续前进。所以他会在他的孩子演员完成所有工作之前告诉经理他已经完成了。为了解决这个问题,一种选择是使用期货并等到演员完成(非常慢)。我的解决方案是创建可通过 URL 访问的服务(POST 到有演员等待反应的服务)。该服务将立即响应,并向其自己的参与者发送消息。因此,一旦参与者向服务发送请求,他们就可以退出,并且不需要产生任何其他参与者。

    object FeedFetcher {
      val poolSize = 10
      var pendingQueue :Queue[RssFeed] = RssFeed.pendingQueue
    
      def main(args :Array[String]) {
        new FetcherManager(poolSize, pendingQueue).start
      }
    }
    
    case object Stop
    
    class FetcherManager(poolSize :Int = 1, var pendingQueue :Queue[RssFeed]) extends Actor {
      val pool = new Array[Fetcher](poolSize)
      var numberProcessed = 0
    
      override def start() :Actor = {
        for (i <- 0 to (poolSize - 1)) {
          val fetcher = new Fetcher(i)
          fetcher.start()
          pool(i) = fetcher
        }
        super.start
      }
    
      def act() {
        for {
          f <- pool
          if (!pendingQueue.isEmpty)
         } {
          pendingQueue.synchronized {
            f ! pendingQueue.dequeue
          }
        }
    
        loop {
          reactWithin(10000L) {
            case id :Int => pendingQueue.synchronized {
              numberProcessed = numberProcessed + 1
              if (!pendingQueue.isEmpty) {
                pool(id) ! pendingQueue.dequeue             
              } else if ((true /: pool) { (done, f) => {
                if (f.getState == Actor.State.Suspended) {
                  f ! Stop
                  done && true
                } else if (f.getState == Actor.State.Terminated) {
                  done && true
                } else {
                  false
                }
              }}) {
                pool foreach { f => {
                  println(f.getState)
                }}
                println("Processed " + numberProcessed + " feeds total.")
                exit
              }
            }
            case TIMEOUT => {
              if (pendingQueue.isEmpty) {
                println("Manager just woke up from timeout with all feeds assigned.")
                pool foreach { f => {
                  if (f.getState == Actor.State.Suspended) {
                    println("Sending Stop to Fetcher " + f.id)
                    f ! Stop
                  }
                }}
                println("Checking state of all Fetchers for termination.")
                if ((true /: pool) { (done, f) => {
                  done && (f.getState == Actor.State.Terminated)
                }}) {
                  exit
                }
              }
            }
          }
        }
      }
    }
    
    class Fetcher(val id :Int) extends Actor {
      var feedsIveDone = 0
      def act() {
        loop {
          react {
            case dbFeed :RssFeed => {
              println("Fetcher " + id + " starting feed")
              //process rss feed here
              feedsIveDone = feedsIveDone + 1
              sender ! id
            }
            case Stop => {
              println(id + " exiting")
              println(feedsIveDone)
              exit
            }
          }
        }
      }
    

    【讨论】:

      猜你喜欢
      • 2011-01-20
      • 2011-01-30
      • 2010-10-07
      • 2014-06-29
      • 2019-04-23
      • 1970-01-01
      • 1970-01-01
      • 2011-02-20
      • 2023-03-30
      相关资源
      最近更新 更多