【问题标题】:Is it possible to define a thread within a function of an actor in Akka Actor?是否可以在 Akka Actor 的 Actor 函数中定义线程?
【发布时间】:2021-01-19 05:45:53
【问题描述】:

假设我在 Akka Actor 中有一个带有方法的 Actor 类:

class SecureActor extends Actor{
  def methodWithThread(): Unit={
}

可以在这个方法中运行一个新线程吗? Akka自己的线程不会有问题吗?

class ThreadExample extends Thread{  
    override def run(){  
      println("Thread is running...");  
}
def methodWithThread(): Unit={
  var t = new ThreadExample()  
  t.start()
}

【问题讨论】:

  • 您将破坏 Akka 为您提供的并发安全性。您提出的这种情况通常是从一个单亲产生多个演员子作为解决方案。
  • 谢谢。我想确定这一点。你可以在答案中更详细地展示它,以便我接受吗? @费利佩
  • 您能否详细说明您为什么要这样做?听起来有点像XY problem
  • @yuval-itzchakov 我想阻止演员,只留下线程向其他演员发送一些控制消息。
  • 阻止一个actor有点违背Akka的核心原则。为什么需要屏蔽演员?演员可以直接将消息发送给子演员吗?

标签: multithreading scala concurrency akka akka-actor


【解决方案1】:

既然您说您正在寻找如何确保以这种方式从 Akka Actor 创建异步任务会破坏它提供给您的 Akka 并发模式,这里有一些非常简单的示例,基于您的类。

这不是我在评论中建议的演示如何从父演员生成子演员的答案。正如你所说,你有兴趣展示如何打破 Akka 并发模式。我建议您寻找如何做到这一点的简单示例。也许this example 有帮助。

import akka.actor.{Actor, ActorSystem, Props}

import scala.util.Random

object BreakingAkkaConcurrency {
  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("BreakingAkkaConcurrency")
    val unsecureActor = actorSystem.actorOf(Props[UnsecureActor], "unsecureActor")
    unsecureActor ! "1"
    unsecureActor ! "2"
    unsecureActor ! "3"
    unsecureActor ! "4"
    unsecureActor ! "5"
    unsecureActor ! "6"
    unsecureActor ! "7"
    unsecureActor ! "8"
    unsecureActor ! "9"
    unsecureActor ! "10"

    Thread.sleep(10000)
  }
}

class UnsecureActor extends Actor {
  def methodWithThread(): Unit = {
  }

  override def receive: Receive = {
    case msg: String =>
      var t = new ThreadExample(msg)
      // by starting a new thread inside an Akka actor you break the synchronous pattern provided by Akka
      t.start()
  }
}

class ThreadExample(id: String) extends Thread {
  override def run() {
    // simulate a random computation which will potentially break the order of messages processed by Akka actors
    // This is where the Akka actors don't have control anymore.
    Thread.sleep(Random.nextInt(10) * 1000)
    println(s"finished $id");
  }
}

输出的顺序与unsecureActor ! "1" 发送消息的顺序不同。

finished 9
finished 4
finished 1
finished 7
finished 3
finished 2
finished 8
finished 5
finished 6
finished 10

【讨论】:

  • 感谢您的宝贵时间。我运行了您的代码,收到消息的顺序是相同的。如果我确保这种随机性不会发生,我还能使用它吗?由于我使用优先级消息队列,因此消息的顺序在我的项目中并不重要。
  • 正如@Tim 回答的那样,是的,你可以做到,没有什么能阻止你以这种方式实施。但是,一旦 actor 内部存在可能破坏它的代码,您就会破坏 akka 并发安全性。我的意思是,一旦你或其他处理相同代码的程序员忘记了它,并在没有并发保证的情况下从你的代码中启动一个线程,代码就会编译并运行。也许将来有一天你会在结果上发现一些问题,而这将是一个寻找错误的难题。那么,我的问题是:如果你不利用 Akka 的优势,你到底为什么要使用它?
  • 抱歉回复晚了。我得到了它。这是一个基于 Akka 的学术研究项目,我认为我的导师认为 Akka 的全部优势并不重要。
【解决方案2】:

可以在 Actor 内部的单独线程中启动代码(例如使用 Future),但您必须小心它如何与 Akka 交互。线程不应读取或修改 Actor 中的任何可变状态,它只能通过发送消息与 Actor 进行通信。

【讨论】:

  • 因此,换句话说,如果不使用具有不可变状态的对象,则在 Actor 内部创建一个线程(即使有或没有Future)可能会创造更多的可能性来破坏分布式系统中的并发性.考虑到更多的人将使用相同的代码,危险会增加更多......
  • @Felipe 是的,使用线程可能会破坏并发性。但可以通过降低风险同时保留收益的方式来做到这一点。
  • 在一个基于研究的项目中,我必须阻止参与者接收一些控制消息(我知道不建议在 Akka 中阻止)。在Await.result 期间之前,我想在我的actor 中运行一个线程来向其他actor 发送一些控制消息。 @蒂姆
  • 永远不要在演员的接收方法中调用Await.result。将onComplete 处理程序添加到向参与者发送包含操作结果的消息的对象。或者有一个单独的线程来处理控制消息并将它们转发给 Actor。
猜你喜欢
  • 2023-04-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-09-09
  • 2013-10-05
  • 1970-01-01
  • 2018-07-17
相关资源
最近更新 更多