【问题标题】:Scala Akka Consumer/Producer: Return ValueScala Akka 消费者/生产者:返回值
【发布时间】:2015-04-24 02:10:18
【问题描述】:

问题陈述

假设我有一个文件,其中包含逐行处理的句子。就我而言,我需要从这些行中提取命名实体(人员、组织、...)。不幸的是,标记器很慢。因此,我决定并行计算,这样可以独立处理行,并将结果收集在一个中心位置。

当前方法

我目前的方法包括使用单一生产者多消费者概念。但是,我对 Akka 比较陌生,但我认为我的问题描述很适合它的功能。让我给你看一些代码:

制作人

Producer 逐行读取文件并将其发送给Consumer。如果达到总行数限制,它会将结果传播回WordCount

class Producer(consumers: ActorRef) extends Actor with ActorLogging {

  var master: Option[ActorRef] = None

  var result = immutable.List[String]()
  var totalLines = 0
  var linesProcessed = 0


  override def receive = {
    case StartProcessing() => {
      master = Some(sender)

      Source.fromFile("sent.txt", "utf-8").getLines.foreach { line =>
        consumers ! Sentence(line)
        totalLines += 1
      }

      context.stop(self)
    }

    case SentenceProcessed(list) => {
      linesProcessed += 1
      result :::= list
      //If we are done, we can propagate the result to the creator
      if (linesProcessed == totalLines) {
        master.map(_ ! result)
      }
    }

    case _ => log.error("message not recognized")
  }
}

消费者

class Consumer extends Actor with ActorLogging {

  def tokenize(line: String): Seq[String] = {
    line.split(" ").map(_.toLowerCase)
  }

  override def receive = {
    case Sentence(sent) => {
      //Assume: This is representative for the extensive computation method 
      val tokens = tokenize(sent)

      sender() ! SentenceProcessed(tokens.toList)
  }

    case _ => log.error("message not recognized")
  }
}

字数(主)

class WordCount extends Actor {

  val consumers = context.actorOf(Props[Consumer].
    withRouter(FromConfig()).
    withDispatcher("consumer-dispatcher"), "consumers")
  val producer = context.actorOf(Props(new Producer(consumers)), "producer")

  context.watch(consumers)
  context.watch(producer)

  def receive = {
    case Terminated(`producer`) => consumers ! Broadcast(PoisonPill)
    case Terminated(`consumers`) => context.system.shutdown
  }
}

object WordCount {

  def getActor() = new WordCount

  def getConfig(routerType: String, dispatcherType: String)(numConsumers: Int) = s"""
      akka.actor.deployment {
        /WordCount/consumers {
          router = $routerType
          nr-of-instances = $numConsumers
          dispatcher = consumer-dispatcher
        }
      }
      consumer-dispatcher {
        type = $dispatcherType
        executor = "fork-join-executor"
      }"""
}

WordCount 演员负责创建其他演员。当Consumer 完成时,Producer 会发送一条包含所有令牌的消息。但是,如何再次传播消息并接受并等待它?第三个WordCount 演员的架构可能是错误的。

主要例程

case class Run(name: String, actor: () => Actor, config: (Int) => String)

object Main extends App {

  val run = Run("push_implementation", WordCount.getActor _, WordCount.getConfig("balancing-pool", "Dispatcher") _)

  def execute(run: Run, numConsumers: Int) = {

    val config = ConfigFactory.parseString(run.config(numConsumers))

    val system = ActorSystem("Counting", ConfigFactory.load(config))
    val startTime = System.currentTimeMillis

    system.actorOf(Props(run.actor()), "WordCount")
    /*
          How to get the result here?!
    */
    system.awaitTermination

    System.currentTimeMillis - startTime
  }


   execute(run, 4)
}

问题

如您所见,实际问题是将结果传播回Main 例程。你能告诉我如何以正确的方式做到这一点吗?问题也是如何等待结果直到消费者完成?我简要浏览了 Akka Future 文档部分,但整个系统对于初学者来说有点不知所措。像var future = message ? actor 这样的东西似乎很合适。不确定,如何做到这一点。使用WordCount 演员也会导致额外的复杂性。也许有可能想出一个不需要这个演员的解决方案?

【问题讨论】:

    标签: scala akka


    【解决方案1】:

    考虑使用 Akka Aggregator Pattern。这会处理低级原语(观看演员、毒丸等)。您可以专注于管理状态。

    您对system.actorOf() 的调用返回一个ActorRef,但您没有使用它。你应该向那个演员询问结果。像这样的:

    implicit val timeout = Timeout(5 seconds)
    val wCount = system.actorOf(Props(run.actor()), "WordCount")
    val answer = Await.result(wCount ? "sent.txt", timeout.duration)
    

    这意味着您的WordCount 类需要一个接受String 消息的receive 方法。那段代码应该汇总结果并告诉sender(),如下所示:

    class WordCount extends Actor {
        def receive: Receive = {
            case filename: String =>
                // do all of your code here, using filename
                sender() ! results
        }
    }
    

    此外,您可以应用一些技术来处理Futures,而不是使用上面的Await 阻止结果。

    【讨论】:

    • 感谢您的回复!我试图理解并使用您建议的聚合器模式。在这里找到要点link。然而,为了让它发挥作用,我不得不再次使用 PoisenPill。否则,我会收到“遇到死信”异常。欢迎您提出改进建议。我也不清楚模式的某些部分(超时?)。看来,这些例子根本不完整。例如缺少聚合器特征扩展。
    • 看来我的结果不是确定性的。今天我收到“死信”异常。
    • import akka.pattern.ask; import akka.util.Timeout
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多