所以我找到了一个对我有用的答案,它可能对其他人有用。
事实证明,您实际上可以对 Warp 的内部进行足够多的处理来做到这一点,但是您剩下的是 Warp 的基本版本,如果您需要诸如日志记录之类的东西,则需要在上面添加其他包。
另外,请注意所谓的“半关闭”连接(当客户端关闭其发送端,但仍在等待数据时)将被检测为已关闭,从而中断您的计算。我不知道任何处理半关闭连接的 HTTP 客户端,但只是需要注意一些事情。
不管怎样,我首先复制了Network.Wai.Handler.Warp和Network.Wai.Handler.Warp.Internal公开的函数runSettings和runSettingsSocket,并制作了调用我提供的函数而不是WarpI.socketConnection的版本,这样我就有了签名:
runSettings' :: Warp.Settings -> (Socket -> IO (IO WarpI.Connection))
-> Wai.Application -> IO ()
这需要复制一些辅助方法,例如 setSocketCloseOnExec 和 windowsThreadBlockHack。那里的双IO 签名可能看起来很奇怪,但这正是您想要的——外部IO 在主线程中运行(调用accept),内部IO 在每个连接线程中运行在accept 返回后分叉。原来的Warp函数runSettings等价于:
\set -> runSettings' set (WarpI.socketConnection >=> return . return)
然后我做了:
data ClientDisappeared = ClientDisappeared deriving (Show, Eq, Enum, Ord)
instance Exception ClientDisappeared
runSettingsSignalDisconnect :: Warp.Settings -> Wai.Application -> IO ()
runSettingsSignalDisconnect set =
runSettings' set (WarpI.socketConnection >=> return . wrapConn)
where
-- Fork a 'monitor' thread that does nothing but attempt to
-- perform a read from conn in a loop 1/sec, and wrap the receive
-- methods on conn so that they first consume from the stuff read
-- by the monitoring thread. If the monitoring thread sees
-- end-of-file (signaled by an empty string read), raise
-- ClientDisappered on the per-connection thread.
wrapConn conn = do
tid <- myThreadId
nxtBstr <- newEmptyMVar :: IO (MVar ByteString)
semaphore <- newMVar ()
readerCount <- newIORef (0 :: Int)
monitorThread <- forkIO (monitor tid nxtBstr semaphore readerCount)
return $ conn {
WarpI.connClose = throwTo monitorThread ClientDisappeared
>> WarpI.connClose conn
, WarpI.connRecv = newRecv nxtBstr semaphore readerCount
, WarpI.connRecvBuf = newRecvBuf nxtBstr semaphore readerCount
}
where
newRecv :: MVar ByteString -> MVar () -> IORef Int
-> IO ByteString
newRecv nxtBstr sem readerCount =
bracket_
(atomicModifyIORef' readerCount $ \x -> (succ x, ()))
(atomicModifyIORef' readerCount $ \x -> (pred x, ()))
(withMVar sem $ \_ -> do w <- tryTakeMVar nxtBstr
case w of
Just w' -> return w'
Nothing -> WarpI.connRecv conn
)
newRecvBuf :: MVar ByteString -> MVar () -> IORef Int
-> WarpI.Buffer -> WarpI.BufSize -> IO Bool
newRecvBuf nxtBstr sem readerCount buf bufSize =
bracket_
(atomicModifyIORef' readerCount $ \x -> (succ x, ()))
(atomicModifyIORef' readerCount $ \x -> (pred x, ()))
(withMVar sem $ \_ -> do
(fulfilled, buf', bufSize') <-
if bufSize == 0 then return (False, buf, bufSize)
else
do w <- tryTakeMVar nxtBstr
case w of
Nothing -> return (False, buf, bufSize)
Just w' -> do
let wlen = B.length w'
if wlen > bufSize
then do BU.unsafeUseAsCString w' $ \cw' ->
copyBytes buf (castPtr cw') bufSize
putMVar nxtBstr (B.drop bufSize w')
return (True, buf, 0)
else do BU.unsafeUseAsCString w' $ \cw' ->
copyBytes buf (castPtr cw') wlen
return (wlen == bufSize, plusPtr buf wlen,
bufSize - wlen)
if fulfilled then return True
else WarpI.connRecvBuf conn buf' bufSize'
)
dropClientDisappeared :: ClientDisappeared -> IO ()
dropClientDisappeared _ = return ()
monitor tid nxtBstr sem st =
catch (monitor' tid nxtBstr sem st) dropClientDisappeared
monitor' tid nxtBstr sem st = do
(hitEOF, readerCount) <- withMVar sem $ \_ -> do
w <- tryTakeMVar nxtBstr
case w of
-- No one picked up our bytestring from last time
Just w' -> putMVar nxtBstr w' >> return (False, 0)
Nothing -> do
w <- WarpI.connRecv conn
putMVar nxtBstr w
readerCount <- readIORef st
return (B.null w, readerCount)
if hitEOF && (readerCount == 0)
-- Don't signal if main thread is also trying to read -
-- in that case, main thread will see EOF directly
then throwTo tid ClientDisappeared
else do threadDelay oneSecondInMicros
monitor' tid nxtBstr sem st
oneSecondInMicros = 1000000