【问题标题】:In Haskell, how can I abort a calculation when a web client disconnects在 Haskell 中,如何在 Web 客户端断开连接时中止计算
【发布时间】:2018-02-12 22:34:04
【问题描述】:

我有一个基于 Haskell 的 Web 服务,它执行的计算对于某些输入可能需要很长时间才能完成。 (这里的“真的很长”是指超过一分钟)

因为执行该计算会占用服务器上所有可用的 CPU,所以我将传入的请求放在一个队列中(嗯,实际上是一个堆栈,原因与典型客户端有关,但这不是重点)当它们到达时在当前运行的计算完成时为它们服务。

我的问题是客户端并不总是等待足够长的时间,有时会超时,断开连接并尝试不同的服务器(好吧,他们再次尝试并点击 elb,通常会得到不同的实例) .此外,有时 Web 客户端要求的计算会因为外部因素而过时,并且 Web 客户端会被终止。

在这些情况下,我真的希望能够在我将下一个请求从堆栈中拉出并开始(昂贵的)计算之前检测到 Web 客户端已经消失。不幸的是,我对snap 的经验让我相信在该框架中没有办法询问“客户端的 TCP 连接是否仍然连接?”而且我还没有找到任何涵盖“客户端断开连接”案例的其他 Web 框架的文档。

那么有没有一个 Haskell Web 框架可以很容易地检测 Web 客户端是否已断开连接?或者如果做不到这一点,是否有一个至少可以使它成为可能?

(我知道在所有情况下,如果不向另一端发送数据,可能无法绝对确定 TCP 客户端是否仍然存在;但是,当客户端实际向服务器和服务器的框架发送 RST 数据包时不让应用程序代码确定连接已断开,这是个问题)


顺便说一句,虽然有人可能会怀疑 warp's onClose 处理程序会让你这样做,但只有在响应准备好并写入客户端时才会触发,因此作为中止正在进行的计算的一种方式是无用的。似乎也没有办法访问接受的套接字以设置 SO_KEEPALIVE 或类似的。 (有方法可以访问初始侦听套接字,但不是接受的)

【问题讨论】:

  • 我怀疑这是一个特定于 Haskell 的问题。这可能会对您有所帮助:stackoverflow.com/a/6162238/1651941
  • 它可能不是特定于 haskell 的,但我正在寻找能够检测到这一点的 haskell web 框架。但无论如何,我不是在处理网络浏览器,而是想知道页面何时关闭。我正在处理具有打开的 TCP 连接到服务器等待对 POST 的响应的 HTTP 客户端。这与该问题所解决的完全不同。
  • 啊,好吧。我以为它只是基于浏览器的。如果您在 TCP 协议级别找到解决方案,我想该解决方案应该可以轻松转换为 Haskell。让我们来看看。 :)
  • 可以选择使用 websockets 吗?
  • @epsilonhalbe 这意味着比 202 响应的建议更复杂的客户端设计。我不明白为什么我不能在 Haskell 中获得与 ClientDisconnectedTo‌​kenResponse.IsClientConnected 在 .Net 世界中获得的相同的东西。

标签: haskell tcp haskell-snap-framework


【解决方案1】:

所以我找到了一个对我有用的答案,它可能对其他人有用。

事实证明,您实际上可以对 Warp 的内部进行足够多的处理来做到这一点,但是您剩下的是 Warp 的基本版本,如果您需要诸如日志记录之类的东西,则需要在上面添加其他包。

另外,请注意所谓的“半关闭”连接(当客户端关闭其发送端,但仍在等待数据时)将被检测为已关闭,从而中断您的计算。我不知道任何处理半关闭连接的 HTTP 客户端,但只是需要注意一些事情。

不管怎样,我首先复制了Network.Wai.Handler.WarpNetwork.Wai.Handler.Warp.Internal公开的函数runSettingsrunSettingsSocket,并制作了调用我提供的函数而不是WarpI.socketConnection的版本,这样我就有了签名:

runSettings' :: Warp.Settings -> (Socket -> IO (IO WarpI.Connection))
             -> Wai.Application -> IO ()

这需要复制一些辅助方法,例如 setSocketCloseOnExecwindowsThreadBlockHack。那里的双IO 签名可能看起来很奇怪,但这正是您想要的——外部IO 在主线程中运行(调用accept),内部IO 在每个连接线程中运行在accept 返回后分叉。原来的Warp函数runSettings等价于:

\set -> runSettings' set (WarpI.socketConnection >=> return . return)

然后我做了:

data ClientDisappeared = ClientDisappeared deriving (Show, Eq, Enum, Ord)
instance Exception ClientDisappeared

runSettingsSignalDisconnect :: Warp.Settings -> Wai.Application -> IO ()
runSettingsSignalDisconnect set =
  runSettings' set (WarpI.socketConnection >=> return . wrapConn)
  where
    -- Fork a 'monitor' thread that does nothing but attempt to
    -- perform a read from conn in a loop 1/sec, and wrap the receive
    -- methods on conn so that they first consume from the stuff read
    -- by the monitoring thread. If the monitoring thread sees
    -- end-of-file (signaled by an empty string read), raise
    -- ClientDisappered on the per-connection thread.
    wrapConn conn = do
      tid <- myThreadId
      nxtBstr <- newEmptyMVar :: IO (MVar ByteString)
      semaphore <- newMVar ()
      readerCount <- newIORef (0 :: Int)
      monitorThread <- forkIO (monitor tid nxtBstr semaphore readerCount)
      return $ conn {
        WarpI.connClose = throwTo monitorThread ClientDisappeared
                          >> WarpI.connClose conn
        , WarpI.connRecv = newRecv nxtBstr semaphore readerCount
        , WarpI.connRecvBuf = newRecvBuf nxtBstr semaphore readerCount
        }
      where
        newRecv :: MVar ByteString -> MVar () -> IORef Int
                -> IO ByteString
        newRecv nxtBstr sem readerCount =
          bracket_
          (atomicModifyIORef' readerCount $ \x -> (succ x, ()))
          (atomicModifyIORef' readerCount $ \x -> (pred x, ()))
          (withMVar sem $ \_ -> do w <- tryTakeMVar nxtBstr
                                   case w of
                                     Just w' -> return w'
                                     Nothing -> WarpI.connRecv conn
          )

        newRecvBuf :: MVar ByteString -> MVar () -> IORef Int
                   -> WarpI.Buffer -> WarpI.BufSize -> IO Bool
        newRecvBuf nxtBstr sem readerCount buf bufSize =
          bracket_
          (atomicModifyIORef' readerCount $ \x -> (succ x, ()))
          (atomicModifyIORef' readerCount $ \x -> (pred x, ()))
          (withMVar sem $ \_ -> do
              (fulfilled, buf', bufSize') <-
                if bufSize == 0 then return (False, buf, bufSize)
                else
                  do w <- tryTakeMVar nxtBstr
                     case w of
                       Nothing -> return (False, buf, bufSize)
                       Just w' -> do
                         let wlen = B.length w'
                         if wlen > bufSize
                           then do BU.unsafeUseAsCString w' $ \cw' ->
                                     copyBytes buf (castPtr cw') bufSize
                                   putMVar nxtBstr (B.drop bufSize w')
                                   return (True, buf, 0)
                           else do BU.unsafeUseAsCString w' $ \cw' ->
                                     copyBytes buf (castPtr cw') wlen
                                   return (wlen == bufSize, plusPtr buf wlen,
                                           bufSize - wlen)
              if fulfilled then return True
                else WarpI.connRecvBuf conn buf' bufSize'
          )
        dropClientDisappeared :: ClientDisappeared -> IO ()
        dropClientDisappeared _ = return ()
        monitor tid nxtBstr sem st =
          catch (monitor' tid nxtBstr sem st) dropClientDisappeared

        monitor' tid nxtBstr sem st = do
          (hitEOF, readerCount) <- withMVar sem $ \_ -> do
            w <- tryTakeMVar nxtBstr
            case w of
              -- No one picked up our bytestring from last time
              Just w' -> putMVar nxtBstr w' >> return (False, 0)
              Nothing -> do
                w <- WarpI.connRecv conn
                putMVar nxtBstr w
                readerCount <- readIORef st
                return (B.null w, readerCount)
          if hitEOF && (readerCount == 0)
            -- Don't signal if main thread is also trying to read -
            -- in that case, main thread will see EOF directly
            then throwTo tid ClientDisappeared
            else do threadDelay oneSecondInMicros
                    monitor' tid nxtBstr sem st
        oneSecondInMicros = 1000000

【讨论】:

    【解决方案2】:

    假设“Web 服务”是指基于 HTTP(S) 的客户端,一种选择是使用 RESTful 方法。服务可以接受请求并返回202 Accepted,而不是假设客户端将保持连接。正如HTTP status code specification 所述:

    请求已被接受处理,但处理尚未完成 [...]

    202 响应是故意不置可否的。它的目的是允许服务器接受对其他进程的请求(可能是一个每天只运行一次的面向批处理的进程),而不需要用户代理与服务器的连接持续到进程完成。与此响应一起返回的实体应包含请求当前状态的指示以及指向状态监视器的指针或用户可以预期何时完成请求的一些估计。

    服务器立即以202 Accepted 响应进行响应,并且还包含一个可供客户端用于轮询状态的 URL。一种选择是将此 URL 放在响应的 Location 标头中,但您也可以将 URL 放在响应正文中的链接中。

    客户端可以轮询状态 URL 以获取状态。计算完成后,状态资源可以提供指向完成结果的链接。

    如果您担心客户端会过于频繁地轮询,可以将缓存标头添加到状态资源和最终结果中。

    REST in Practice 概述了一般概念,而RESTful Web Services Cookbook 有很多很好的细节。

    我并不是说你不能用 HTTP 或 TCP/IP 做某事(我不知道),但如果你不能,那么上面是一个经过验证的类似解决方案问题。

    显然,这完全独立于编程语言,但我的经验是REST and algebraic data types go well together

    【讨论】:

    • 不幸的是,这使客户端变得非常复杂 - 在这种情况下,当客户端确定客户端正在使用现在过时的信息时,我们不能依赖某些监督过程来简单地杀死客户端。此外,在计算开始之前,通常很难预测此输入是否会在 0.1 秒、5 秒或 50 秒内完成。我真的不需要从那种提交和投票设计中获得的对计算的额外细粒度控制。我只需要一种简单的方法来检测客户不再等待答案。
    • @DanielMartin 够公平的。我(根本)不确定这个答案是否有用,但这正是我很容易想到的。如果它不适合你的场景,那就不适合。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2010-11-01
    • 1970-01-01
    • 2020-08-27
    • 2014-01-24
    • 1970-01-01
    相关资源
    最近更新 更多