【问题标题】:Scala actor: receiveWithin() doesn't receive messagesScala 演员:receiveWithin() 不接收消息
【发布时间】:2011-12-05 02:34:38
【问题描述】:

我正在 Scala 中构建一个基于参与者的服务,消费者可以在其中查询客户端是否被授权,也可以授权客户端。

如果消费者查询客户端的授权状态并且该客户端尚未获得授权,则参与者应在指定的超时时间内等待传入的Authorize 消息,然后发送回复。 IsAuthorized 应该能够在消费者代码中同步执行,以便它阻塞并等待回复。类似的东西

service !? IsAuthorized(client) => {
  case IsAuthorizedResponse(_, authorized) => // do something
}

但是我的演员中的receiveWithin() 从来没有收到消息并且总是遇到超时。

这是我的代码

case object WaitingForAuthorization
case class WaitingForAuthorizationResponse(clients: immutable.Set[Client])
case class IsAuthorized(client: Client)
case class IsAuthorizedResponse(client: Client, authorized: Boolean)
case class Authorize(client: Client)

class ClientAuthorizationService {
  private val authorized: mutable.Set[Client] = new mutable.HashSet[Client] with mutable.SynchronizedSet[Client]
  private val waiting: mutable.Set[Client] = new mutable.HashSet[Client] with mutable.SynchronizedSet[Client]

  def actor = Actor.actor {
    loop {
      react {
        case IsAuthorized(client: Client) => reply {
          if (authorized contains client) {
            IsAuthorizedResponse(client, true)
          } else {
            waiting += client
            var matched = false;
            val end = Instant.now.plus(ClientAuthorizationService.AUTH_TIMEOUT)

            while (!matched && Instant.now.isBefore(end)) {
              // ERROR HERE: Never receives Authorize messages
              receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) {
                case Authorize(authorizedClient: Client) => {
                  authorizeClient(authorizedClient)
                  if (authorizedClient == client) matched = true
                }
                case TIMEOUT => // do nothing since we handle the timeout in the while loop
              }
            }

            IsAuthorizedResponse(client, matched)
          }
        }

        case Authorize(client: Client) => authorizeClient(client)
        case WaitingForAuthorization => reply {
          WaitingForAuthorizationResponse(immutable.Set() ++ waiting)
        }
      }
    }
  }

  private def authorizeClient(client: Client) = synchronized {
    authorized += client
    waiting -= client
  }
}

object ClientAuthorizationService {
  val AUTH_TIMEOUT: Long = 60 * 1000;
}

当我将Authorize 消息发送给处于receiveWithin 块中的actor 时,消息被下面的第二个case 语句捕获,该语句实际上应该只在当时没有人等待响应时捕获这些消息.

我的代码有什么问题?

更新:

以下是相关代码的简化版本,它实际上代表了一个更简单和不同的逻辑,但可能更好地阐明了问题:

loop {
  react {
    case IsAuthorized(client: Client) => reply {
      var matched = false

      // In the "real" logic we would actually loop here until either the
      // authorized client matches the requested client or the timeout is hit.
      // For the sake of the demo we only take the first Authorize message.

      receiveWithin(60*1000) {
        // Although Authorize is send to actor it's never caught here
        case Authorize(authorizedClient: Client) => matched = authorizedClient == client
        case TIMEOUT => 
      }

      IsAuthorizedResponse(client, matched)
    }

    case Authorize(client: Client) => // this case is hit
  }
}

更新 2:

我终于解决了这个问题。我认为问题在于演员在尝试在对前面的IsAuthorized 消息的回复中接收Authorize 消息时被阻止。

我重写了代码,以便在我们等待Authorized 时启动匿名Actor。下面是代码,感兴趣的朋友可以看看。 waitingMap[Client, Actor]

loop {
  react {
    case IsAuthorized(client: Client) =>
      if (authorized contains client) {
        sender ! IsAuthorizedResponse(client, true)
      } else {
        val receipient = sender
        // Start an anonymous actor that waits for an Authorize message
        // within a given timeout and sends a reply to the consumer.
        // The actor will be notified by the parent actor below.
        waiting += client -> Actor.actor {
          val cleanup = () => {
            waiting -= client
            exit()
          }

          receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) {
            case Authorize(c) =>
              receipient ! IsAuthorizedResponse(client, true)
              cleanup()
            case TIMEOUT =>
              receipient ! IsAuthorizedResponse(client, false)
              cleanup()
          }
        }
      }

    case Authorize(client: Client) =>
      authorized += client

      waiting.get(client) match {
        case Some(actor) => actor ! Authorize(client)
        case None =>
      }

    case WaitingForAuthorization => sender ! WaitingForAuthorizationResponse(immutable.Set() ++ waiting.keySet)
  }
}

如果有更好的方法来解决这个问题,请告诉我!

【问题讨论】:

  • 你能清理/缩短代码并只发布相关部分吗?

标签: scala concurrency actor


【解决方案1】:

不是回复问题吗?在

case IsAuthorized(client: Client) => reply { ... }

所有代码都在回复块的参数中,因此在实际发送回复之前执行(包括receiveWithing)。这意味着当您的客户处理您的回复时,您将不再等待它。

在您的原始代码中,它可能应该类似于

case IsAuthorized(client: Client) =>
  if(ok) reply(AuthorizedReply(client, true))
  else {
     reply(AuthorizedReply(client, false))
     receiveWithin(...)
  }

【讨论】:

  • "所以它在实际发送回复之前被执行(包括receiveWithing)"这正是我想要的:-) 回复应该是阻塞的,直到客户端被授权或超时命中.这段代码的灵感来自这个例子here(参见“第二个例子”一章的第三个代码sn-p)。您的代码也不正确,因为如果客户端尚未获得授权,您将立即发送带有“false”的回复,然后等待传入的授权消息。
【解决方案2】:

我终于解决了这个问题。我认为问题在于演员在尝试在前面的IsAuthorized 消息的回复中接收Authorize 消息时被阻止。

我重写了代码,以便在我们等待Authorized 时启动匿名Actor。下面是代码供有兴趣的人参考。 waitingMap[Client, Actor]

loop {
  react {
    case IsAuthorized(client: Client) =>
      if (authorized contains client) {
        sender ! IsAuthorizedResponse(client, true)
      } else {
        val receipient = sender
        // Start an anonymous actor that waits for an Authorize message
        // within a given timeout and sends a reply to the consumer.
        // The actor will be notified by the parent actor below.
        waiting += client -> Actor.actor {
          val cleanup = () => {
            waiting -= client
            exit()
          }

          receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) {
            case Authorize(c) =>
              receipient ! IsAuthorizedResponse(client, true)
              cleanup()
            case TIMEOUT =>
              receipient ! IsAuthorizedResponse(client, false)
              cleanup()
          }
        }
      }

    case Authorize(client: Client) =>
      authorized += client

      waiting.get(client) match {
        case Some(actor) => actor ! Authorize(client)
        case None =>
      }

    case WaitingForAuthorization => sender ! WaitingForAuthorizationResponse(immutable.Set() ++ waiting.keySet)
  }
}

如果有更好的方法来解决这个问题,请告诉我!

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-06-19
    • 2012-10-31
    • 1970-01-01
    • 2013-11-29
    • 2012-11-15
    • 2018-09-29
    • 2016-09-04
    相关资源
    最近更新 更多