【发布时间】:2014-10-15 04:32:19
【问题描述】:
我正在尝试构建一个Conduit,它接收ByteStrings 作为输入(每个块大小约为1kb)并产生连接ByteStrings 的512kb 块作为输出。
这看起来应该很简单,但我遇到了很多麻烦,我尝试使用的大多数策略都只成功地将块分成更小的块,我没有成功连接更大的块。
我开始尝试isolate,然后是takeExactlyE,最后是conduitVector,但无济于事。最终我决定这样做:
import qualified Data.Conduit as C
import qualified Data.Conduit.Combinators as C
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
chunksOfAtLeast :: Monad m => Int -> C.Conduit B.ByteString m BL.ByteString
chunksOfAtLeast chunkSize = loop BL.empty chunkSize
where
loop buffer n = do
mchunk <- C.await
case mchunk of
Nothing ->
-- Yield last remaining bytes
when (n < chunkSize) (C.yield buffer)
Just chunk -> do
-- Yield when the buffer has been filled and start over
let buffer' = buffer <> BL.fromStrict chunk
l = B.length chunk
if n <= l
then C.yield buffer' >> loop BL.empty chunkSize
else loop buffer' (n - l)
P.S.我决定不为这个函数拆分更大的块,但这只是一个方便的简化。
但是,考虑到所有处理分块的管道函数,这似乎非常冗长[1,2,3,4]。请帮忙!使用组合器肯定有更好的方法来做到这一点,但我缺少一些直觉!
P.P.S.可以像我所做的那样对缓冲区使用惰性字节串吗?我有点不清楚字节串的内部表示以及这是否会有所帮助,特别是因为我使用的是BL.length,我猜它可能会评估thunk?
结论
为了详细说明迈克尔的答案和 cmets,我最终得到了这个管道:
import qualified Data.Conduit as C
import qualified Data.Conduit.Combinators as C
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
-- | "Strict" rechunk of a chunked conduit
chunksOfE' :: (MonadBase base m, PrimMonad base)
=> Int
-> C.Conduit ByteString m ByteString
chunksOfE' chunkSize = C.vectorBuilder chunkSize C.mapM_E =$= C.map fromByteVector
我的理解是vectorBuilder 将支付早期连接较小块的成本,将聚合块生成为严格的字节串。
据我所知,当聚合块非常大和/或馈送到自然流接口时,可能需要一种生成惰性字节串块(即 “chunked chunks”)的替代实现像网络套接字。这是我对“惰性字节串”版本的最佳尝试:
import qualified Data.Sequences.Lazy as SL
import qualified Data.Sequences as S
import qualified Data.Conduit.List as CL
-- | "Lazy" rechunk of a chunked conduit
chunksOfE :: (Monad m, SL.LazySequence lazy strict)
=> S.Index lazy
-> C.Conduit strict m lazy
chunksOfE chunkSize = CL.sequence C.sinkLazy =$= C.takeE chunkSize
【问题讨论】:
-
另外,令人难以置信的是,我还没有找到一个只做这个的例子......