【发布时间】:2012-09-24 16:13:21
【问题描述】:
目标是拥有一个具有以下类型签名的管道
protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
管道应重复解析通过 TCP/IP(使用 network-conduit 包)接收的协议缓冲区(使用 ByteString -> a 函数)。
电报格式为
{length (32 bits big endian)}{protobuf 1}{length}{protobuf 2}...
(大括号不是协议的一部分,仅用于分隔实体)。
第一个想法是使用sequenceSink 重复应用能够解析一个ProtoBuf 的Sink:
[...]
import qualified Data.Binary as B
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Util as CU
protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
protobufConduit protobufDecode =
CU.sequenceSink () $ \() ->
do lenBytes <- CB.take 4 -- read protobuf length
let len :: Word32
len = B.decode lengthBytes -- decode ProtoBuf length
intLen = fromIntegral len
protobufBytes <- CB.take intLen -- read the ProtoBuf bytes
return $ CU.Emit () [ protobufDecode protobufBytes ] -- emit decoded ProtoBuf
它不起作用(仅适用于第一个协议缓冲区),因为似乎有许多“剩余”字节已从源读取但未通过 CB.take 消耗被丢弃。
而且我发现没有办法将“其余部分推回源头”。
我的概念完全错了吗?
PS:即使我在这里使用协议缓冲区,问题也与协议缓冲区无关。为了调试问题,我总是使用{length}{UTF8 encoded string}{length}{UTF8 encoded string}... 和与上述类似的管道 (utf8StringConduit :: MonadResource m => Conduit ByteString m Text)。
更新:
我只是尝试用剩余字节替换状态(上面示例中没有状态()),并通过调用首先消耗已读取字节(来自状态)的函数替换CB.take调用,并且仅在需要时调用await(当状态不够大时)。不幸的是,这也不起作用,因为只要 Source 没有剩余字节,sequenceSink 就不会执行代码,但状态仍然包含剩余的字节:-(。
如果您对代码感兴趣(没有优化或非常好但应该足以测试):
utf8StringConduit :: forall m. MonadResource m => Conduit ByteString m Text
utf8StringConduit =
CU.sequenceSink [] $ \st ->
do (lengthBytes, st') <- takeWithState BS.empty st 4
let len :: Word32
len = B.decode $ BSL.fromChunks [lengthBytes]
intLength = fromIntegral len
(textBytes, st'') <- takeWithState BS.empty st' intLength
return $ CU.Emit st'' [ TE.decodeUtf8 $ textBytes ]
takeWithState :: Monad m
=> ByteString
-> [ByteString]
-> Int
-> Pipe l ByteString o u m (ByteString, [ByteString])
takeWithState acc state 0 = return (acc, state)
takeWithState acc state neededLen =
let stateLenSum = foldl' (+) 0 $ map BS.length state
in if stateLenSum >= neededLen
then do let (firstChunk:state') = state
(neededChunk, pushBack) = BS.splitAt neededLen firstChunk
acc' = acc `BS.append` neededChunk
neededLen' = neededLen - BS.length neededChunk
state'' = if BS.null pushBack
then state'
else pushBack:state'
takeWithState acc' state'' neededLen'
else do aM <- await
case aM of
Just a -> takeWithState acc (state ++ [a]) neededLen
Nothing -> error "to be fixed later"
【问题讨论】:
-
用于编码长度的四个字节是否可能包含在长度中?这将导致您使用
protobufBytes <- CB.take intLen读取四个额外的字节。 -
@MattS ,抱歉,我没有正确理解您的问题?长度中包含的长度是什么意思?
-
在您指定的字节流中,每个
protobuf前面都有4 个字节,表示protobuf的长度(以字节数计)。我最初的理论是这 4 个字节实际上可能代表protobuf加上 4 字节长度标头的长度。这将导致您的代码读取超过protobuf实际结尾的 4 个字节,错误地消耗应该代表下一个protobuf长度的 4 个字节。 -
是的,它们是下一个PB的大小。
标签: haskell protocol-buffers conduit