【问题标题】:Is there a limit to how many Akka Streams can run at the same time?可以同时运行多少个 Akka Streams 有限制吗?
【发布时间】:2019-08-05 16:48:55
【问题描述】:

我正在尝试使用BroadcastHub 实现一个简单的一对多发布/订阅模式。对于大量订阅者来说,这会默默地失败,这让我觉得我可以运行的流数量达到了一些限制。

首先,让我们定义一些事件:

sealed trait Event
case object EX extends Event
case object E1 extends Event
case object E2 extends Event
case object E3 extends Event
case object E4 extends Event
case object E5 extends Event

我已经使用BroadcastHub 实现了发布者,每次我想添加一个新订阅者时添加一个Sink.actorRefWithAck。发布EX 事件结束广播:

trait Publisher extends Actor with ActorLogging {
  implicit val materializer = ActorMaterializer()

  private val sourceQueue = Source.queue[Event](Publisher.bufferSize, Publisher.overflowStrategy)
  private val (
    queue: SourceQueueWithComplete[Event],
    source: Source[Event, NotUsed]
  ) = {
    val (q,s) = sourceQueue.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both).run()
    s.runWith(Sink.ignore)
    (q,s)
  }

  def publish(evt: Event) = {
    log.debug("Publishing Event: {}", evt.getClass().toString())
    queue.offer(evt)
    evt match {
      case EX => queue.complete()
      case _ => Unit
    }
 }

  def subscribe(actor: ActorRef, ack: ActorRef): Unit =
    source.runWith(
      Sink.actorRefWithAck(
        actor,
        onInitMessage = Publisher.StreamInit(ack),
        ackMessage = Publisher.StreamAck,
        onCompleteMessage = Publisher.StreamDone,
        onFailureMessage = onErrorMessage))

  def onErrorMessage(ex: Throwable) = Publisher.StreamFail(ex)

  def publisherBehaviour: Receive = {
    case Publisher.Subscribe(sub, ack) => subscribe(sub, ack.getOrElse(sender()))
    case Publisher.StreamAck => Unit
  }

  override def receive = LoggingReceive { publisherBehaviour }
}

object Publisher {
  final val bufferSize = 5
  final val overflowStrategy = OverflowStrategy.backpressure

  case class Subscribe(sub: ActorRef, ack: Option[ActorRef])

  case object StreamAck
  case class StreamInit(ack: ActorRef)
  case object StreamDone
  case class StreamFail(ex: Throwable)
}

订阅者可以实现Subscriber trait 来分离逻辑:

trait Subscriber {
  def onInit(publisher: ActorRef): Unit = ()
  def onInit(publisher: ActorRef, k: KillSwitch): Unit = onInit(publisher)
  def onEvent(event: Event): Unit = ()
  def onDone(publisher: ActorRef, subscriber: ActorRef): Unit = ()
  def onFail(e: Throwable, publisher: ActorRef, subscriber: ActorRef): Unit = ()
}

actor 逻辑很简单:

class SubscriberActor(subscriber: Subscriber) extends Actor with ActorLogging {

  def subscriberBehaviour: Receive = {
    case Publisher.StreamInit(ack) => {
      log.debug("Stream initialized.")
      subscriber.onInit(sender())
      sender() ! Publisher.StreamAck
      ack.forward(Publisher.StreamInit(ack))
    }
    case Publisher.StreamDone => {
      log.debug("Stream completed.")
      subscriber.onDone(sender(),self)
    }
    case Publisher.StreamFail(ex) => {
      log.error(ex, "Stream failed!")
      subscriber.onFail(ex,sender(),self)
    }
    case e: Event => {
      log.debug("Observing Event: {}",e)
      subscriber.onEvent(e)
      sender() ! Publisher.StreamAck
    }
  }

  override def receive = LoggingReceive { subscriberBehaviour }
}

其中一个关键点是所有订阅者都必须接收到发布者发送的所有消息,因此我们必须知道所有流都已物化并且所有参与者都准备好在开始广播之前接收。这就是StreamInit 消息被转发给另一个用户提供的参与者的原因。

为了测试这一点,我定义了一个简单的MockPublisher,当被告知这样做时,它只会广播一个事件列表:

class MockPublisher(events: Event*) extends Publisher {
  def receiveBehaviour: Receive = {
    case MockPublish => events map publish
  }
  override def receive = LoggingReceive { receiveBehaviour orElse publisherBehaviour }
}
case object MockPublish

我还定义了一个MockSubscriber,它只计算它看到了多少事件:

class MockSubscriber extends Subscriber {
  var count = 0
  val promise = Promise[Int]()
  def future = promise.future

  override def onInit(publisher: ActorRef): Unit = count = 0
  override def onEvent(event: Event): Unit = count += 1
  override def onDone(publisher: ActorRef, subscriber: ActorRef): Unit = promise.success(count)
  override def onFail(e: Throwable, publisher: ActorRef, subscriber: ActorRef): Unit = promise.failure(e) 
}

还有一个订阅的小方法:

object MockSubscriber {
  def sub(publisher: ActorRef, ack: ActorRef)(implicit system: ActorSystem): Future[Int] = {
    val s = new MockSubscriber()
    implicit val tOut = Timeout(1.minute)
    val a = system.actorOf(Props(new SubscriberActor(s)))

    val f = publisher ! Publisher.Subscribe(a, Some(ack))

    s.future
  }
}

我将所有内容放在一个单元测试中:

class SubscriberTests extends TestKit(ActorSystem("SubscriberTests")) with
    WordSpecLike with Matchers with BeforeAndAfterAll with ImplicitSender {

  override def beforeAll:Unit = {
    system.eventStream.setLogLevel(Logging.DebugLevel)
  }
  override def afterAll:Unit = {
    println("Shutting down...")
    TestKit.shutdownActorSystem(system)
  }

  "The Subscriber" must {
    "publish events to many observers" in {
      val n = 9

      val p = system.actorOf(Props(new MockPublisher(E1,E2,E3,E4,E5,EX)))

      val q = scala.collection.mutable.Queue[Future[Int]]()

      for (i <- 1 to n) {
        q += MockSubscriber.sub(p,self)
      }

      for (i <- 1 to n) {
        expectMsgType[Publisher.StreamInit](70.seconds)
      }
      p ! MockPublish

      q.map { f => Await.result(f, 10.seconds) should be (6) }
    }
  }
}

对于相对较小的 n 值,此测试成功,但对于 val n = 90000 等值则失败。任何地方都不会出现捕获或未捕获的异常,Java 也不会出现任何内存不足的投诉(如果我走得更高,确实会发生这种情况)。

我错过了什么?

编辑:在具有不同规格的多台计算机上进行了尝试。调试信息显示,一旦n 足够高,任何订阅者都不会收到任何消息。

【问题讨论】:

    标签: scala akka scalability akka-stream


    【解决方案1】:

    Akka Stream(以及任何其他反应式流,实际上)为您提供背压。如果您没有搞砸创建消费者的方式(例如,允许创建 1GB JSON,只有在将其提取到内存后才会将其切成小块),您应该有一个舒适的环境,您可以考虑您的内存使用情况上限很大(因为背压如何管理推拉机制)。一旦你测量了你的上限在哪里,你就可以设置你的 JVM 和容器内存,这样你就可以让它运行而不用担心内存不足错误(前提是你的 JVM 中没有发生其他可能导致内存的事情使用高峰)。

    因此,从这里我们可以看出,您可以并行运行多少流是有一些限制的 - 具体来说,您只能在内存允许的范围内运行它们。 CPU 不应该是一个限制(因为您将有多个线程),但如果您将在一台机器上启动太多线程,那么 CPU 不可避免地必须在不同的流之间切换,从而使它们中的每一个都变慢。它可能不是技术障碍,但您最终可能会遇到处理速度如此之慢以至于无法实现其业务目的的情况(尽管,我想您必须同时运行多个流) .

    在您的测试中,您可能还会遇到一些其他问题。例如。如果您在某些阻塞操作中重用与 Actor System 相同的线程池,而没有通知线程池它们正在阻塞,您可能会遇到死锁(事实上,您应该运行所有 IO 阻塞操作在与“计算”操作不同的线程池上)。同时发生 90000(!) 个并发事件(并且可能具有相同的小线程池)几乎可以保证会遇到问题(我猜你可能会遇到问题,即使你直接在期货上运行代码而不是演员)。在这里,您在测试中使用 Actor 系统,AFAIR 使用阻塞逻辑仅突出显示所有可能的问题,这些问题与将阻塞和非阻塞任务保持在同一个位置的小线程池。

    【讨论】:

    • 如果我的虚拟机内存用完了,这会默默地发生,这使得它成为 Akka Streams 的问题。进一步增加流的数量确实让我得到了OutOfMemoryError,但这是意料之中的。上面的代码卡住了,没有异常显示。另外,我贴的代码除了在主线程中等待最终结果外,没有其他阻塞或效率瓶颈。
    • 这与标题中的问题无关,但您有多个从同一个队列读取的接收器,例如s.runWith(Sink.ignore) 中的 Sing.ignore - 它可能在所有其他流之前从队列中读取事件(您跨流共享队列...),因此所有其他流可能永远不会收到事件。
    • 源是BroadcastHub,它允许你多次实现源。见:doc.akka.io/docs/akka/current/stream/…
    猜你喜欢
    • 2016-04-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-01-12
    • 2017-09-28
    • 1970-01-01
    相关资源
    最近更新 更多