【问题标题】:unicast in Play framework and SSE (scala): how do i know which stream to send to?Play框架和SSE(scala)中的单播:我怎么知道要发送到哪个流?
【发布时间】:2014-08-06 20:01:29
【问题描述】:

我的应用列出了主机,并且该列表是动态且不断变化的。它基于 Akka 演员和服务器发送事件。 当新客户端连接时,他们需要获取当前列表以显示。但是,我不想每次连接新客户端时都将列表推送给所有客户端。因此,遵循实时弹性搜索示例并通过为每个 Connect() 创建一个 (Enumerator, Channel) 并为其提供 UUID 来模拟单播。当我需要广播时,我将映射所有内容并更新它们,以便能够向客户端进行单播(并且应该很少)。

我的问题是 - 我如何获得新客户端的 UUID 以便它可以使用它?我正在寻找的流程是: - 客户要求 EventStream - 服务器用 UUID 创建一个新的(Enumerator, channel),并将 Enumerator 和 UUID 返回给客户端 - 客户使用 uuid 请求表格 - 服务器只在对应 uuid 的通道上推送表

那么,客户端如何知道 UUID?如果是 Web 套接字,发送请求应该会得到预期的结果,因为它会到达自己的通道。但在 SSE 中,客户端 -> 服务器是在不同的通道上完成的。有什么解决办法吗?

代码 sn-ps:

case class Connected(uuid: UUID, enumerator: Enumerator[ JsValue ] )

trait MyActor extends Actor{
  var channelMap = new HashMap[UUID,(Enumerator[JsValue], Channel[JsValue])]

  def connect() = {
    val con = Concurrent.broadcast[JsValue]
    val uuid = UUID.randomUUID()
    channelMap += (uuid -> con)
    Connected(uuid, con._1)
  }
...
}

object HostsActor extends MyActor {
...
  override def receive = {
    case Connect => {
      sender ! connect
    }
...
}

object Actors {
  def hostsStream = {
    getStream(getActor("hosts", Props (HostsActor)))
  }

  def getActor(actorPath: String, actorProps : Props): Future[ActorRef] = {
    /* some regular code to create a new actor if the path does not exist, or return the existing one else */
  }


  def getStream(far: Future[ActorRef]) = {
    far flatMap {ar =>
      (ar ? Connect).mapTo[Connected].map { stream =>
        stream
      }
    }
  }
...
}

object AppController extends Controller {
  def getHostsStream = Action.async {
    Actors.hostsStream map { ac =>
************************************
**  how do i use the UUID here??  **
************************************
      Ok.feed(ac.enumerator &> EventSource()).as("text/event-stream")
    }
  }

【问题讨论】:

    标签: scala playframework-2.0 akka actor server-sent-events


    【解决方案1】:

    我设法通过在返回频道后异步推送 uuid 来解决它,中间有一段时间:

      override def receive = {
        case Connect => {
          val con = connect()
          sender ! con
          import scala.concurrent.ExecutionContext.Implicits.global 
          context.system.scheduler.scheduleOnce(0.1 seconds){
            unicast(
              con.uuid, 
              JsObject (
                Seq (
                  "uuid" -> JsString(con.uuid.toString)   
                )    
              ) 
            )
          }
        }
    

    这实现了它的目标 - 客户端获得了 UUID 并能够缓存并使用它来将 getHostsList 推送到服务器:

      @stream = new EventSource("/streams/hosts")
      @stream.addEventListener "message", (event) =>
        data = JSON.parse(event.data)
        if data.uuid
          @uuid = data.uuid
          $.ajax
            type: 'POST',
            url: "/streams/hosts/" + @uuid + "/sendlist"
            success: (data) ->
              console.log("sent hosts request to server successfully")
            error: () ->
              console.log("failed sending hosts request to server")
        else
          ****************************
          *                          *
          *  handle parsing hosts    *
          *                          *
          *                          *
          ****************************
          @view.render()
    

    虽然这有效,但我必须说我不喜欢它。引入人为延迟,以便客户端可以获取频道并开始收听(我尝试过没有延迟,但客户端没有获取 uuid)是危险的,因为如果系统变得更忙,它可能仍然会错过,但是让它太长了伤害反应性方面。

    如果有人有一个可以同步完成的解决方案 - 将 uuid 作为原始 eventSource 请求的一部分返回 - 我会非常乐意将我的解决方案降级。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-03-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多