【问题标题】:Sequential Binary Data Decoding Using Conduits使用管道的顺序二进制数据解码
【发布时间】:2012-09-24 16:13:21
【问题描述】:

目标是拥有一个具有以下类型签名的管道

protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a

管道应重复解析通过 TCP/IP(使用 network-conduit 包)接收的协议缓冲区(使用 ByteString -> a 函数)。

电报格式为

{length (32 bits big endian)}{protobuf 1}{length}{protobuf 2}...

(大括号不是协议的一部分,仅用于分隔实体)。

第一个想法是使用sequenceSink 重复应用能够解析一个ProtoBuf 的Sink

[...]
import qualified Data.Binary         as B
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Util   as CU

protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
protobufConduit protobufDecode =
    CU.sequenceSink () $ \() ->
        do lenBytes <- CB.take 4                                -- read protobuf length
           let len :: Word32
               len = B.decode lengthBytes                       -- decode ProtoBuf length
               intLen = fromIntegral len
           protobufBytes <- CB.take intLen                      -- read the ProtoBuf bytes
           return $ CU.Emit () [ protobufDecode protobufBytes ] -- emit decoded ProtoBuf

它不起作用(仅适用于第一个协议缓冲区),因为似乎有许多“剩余”字节已从源读取但未通过 CB.take 消耗被丢弃。

而且我发现没有办法将“其余部分推回源头”。

我的概念完全错了吗?

PS:即使我在这里使用协议缓冲区,问题也与协议缓冲区无关。为了调试问题,我总是使用{length}{UTF8 encoded string}{length}{UTF8 encoded string}... 和与上述类似的管道 (utf8StringConduit :: MonadResource m =&gt; Conduit ByteString m Text)。

更新:

我只是尝试用剩余字节替换状态(上面示例中没有状态()),并通过调用首先消耗已读取字节(来自状态)的函数替换CB.take调用,并且仅在需要时调用await(当状态不够大时)。不幸的是,这也不起作用,因为只要 Source 没有剩余字节,sequenceSink 就不会执行代码,但状态仍然包含剩余的字节:-(。

如果您对代码感兴趣(没有优化或非常好但应该足以测试):

utf8StringConduit :: forall m. MonadResource m => Conduit ByteString m Text
utf8StringConduit =
    CU.sequenceSink [] $ \st ->
        do (lengthBytes, st') <- takeWithState BS.empty st 4
           let len :: Word32
               len = B.decode $ BSL.fromChunks [lengthBytes]
               intLength = fromIntegral len
           (textBytes, st'') <- takeWithState BS.empty st' intLength
           return $ CU.Emit st'' [ TE.decodeUtf8 $ textBytes ]

takeWithState :: Monad m
              => ByteString
              -> [ByteString]
              -> Int
              -> Pipe l ByteString o u m (ByteString, [ByteString])
takeWithState acc state 0 = return (acc, state)
takeWithState acc state neededLen =
    let stateLenSum = foldl' (+) 0 $ map BS.length state
     in if stateLenSum >= neededLen
           then do let (firstChunk:state') = state
                       (neededChunk, pushBack) = BS.splitAt neededLen firstChunk
                       acc' = acc `BS.append` neededChunk
                       neededLen' = neededLen - BS.length neededChunk
                       state'' = if BS.null pushBack
                                    then state'
                                    else pushBack:state'
                   takeWithState acc' state'' neededLen'
           else do aM <- await
                   case aM of
                     Just a -> takeWithState acc (state ++ [a]) neededLen
                     Nothing -> error "to be fixed later"

【问题讨论】:

  • 用于编码长度的四个字节是否可能包含在长度中?这将导致您使用protobufBytes &lt;- CB.take intLen 读取四个额外的字节。
  • @MattS ,抱歉,我没有正确理解您的问题?长度中包含的长度是什么意思?
  • 在您指定的字节流中,每个protobuf 前面都有4 个字节,表示protobuf 的长度(以字节数计)。我最初的理论是这 4 个字节实际上可能代表 protobuf 加上 4 字节长度标头的长度。这将导致您的代码读取超过 protobuf 实际结尾的 4 个字节,错误地消耗应该代表下一个 protobuf 长度的 4 个字节。
  • 是的,它们是下一个PB的大小。

标签: haskell protocol-buffers conduit


【解决方案1】:

对于协议缓冲区解析和序列化,我们使用messageWithLengthPutMmessageWithLengthGetM(见下文),但我假设它对长度使用 varint 编码,这不是您需要的。我可能会尝试通过将messageWithLength Get/Put 替换为类似

的东西来调整我们的实现
myMessageWithLengthGetM = 
   do size <- getWord32be 
      getMessageWithSize size

但我不知道如何使用协议缓冲区包中的可用函数来实现getMessageWithSize。另一方面,您可以只 getByteString 然后“重新解析”字节串。

关于管道:您是否尝试过在没有Data.Conduit.Util 的情况下实现管道?类似的东西

protobufConduit protobufDecode = loop
   where
      loop = 
         do len <- liftM convertLen (CB.take 4)
            bs <- CB.take len
            yield (protobufDecode bs)
            loop

这是我们使用的代码:

pbufSerialize :: (ReflectDescriptor w, Wire w) => Conduit w IO ByteString
pbufSerialize = awaitForever f
    where f pb = M.mapM_ yield $ BSL.toChunks $ runPut (messageWithLengthPutM pb)

pbufParse :: (ReflectDescriptor w, Wire w, Show w) => Conduit ByteString IO w
pbufParse = new
    where
      new = read (runGet messageWithLengthGetM . BSL.fromChunks . (:[]))
      read parse =
          do mbs <- await
             case mbs of
               Just bs -> checkResult (parse bs)
               Nothing -> return ()
      checkResult result =
          case result of
            Failed _ errmsg -> fail errmsg
            Partial cont -> read (cont . Just . BSL.fromChunks . (:[]))
            Finished rest _ msg ->
                do yield msg
                   checkResult (runGet messageWithLengthGetM rest)

【讨论】:

  • 感谢@David Leuschner,因为我在通道的两边都使用了 Haskell,所以我只是更改了我的协议,现在使用您的 pbufSerializepbufParse 函数的代码:-)。不使用CU.sequenceSink 的想法至关重要。谢谢!
猜你喜欢
  • 1970-01-01
  • 2014-08-09
  • 2013-09-20
  • 2021-12-05
  • 1970-01-01
  • 2014-09-02
  • 1970-01-01
  • 1970-01-01
  • 2011-11-02
相关资源
最近更新 更多