【问题标题】:Rechunk a conduit into larger chunks using combinators使用组合器将管道重新分块成更大的块
【发布时间】:2014-10-15 04:32:19
【问题描述】:

我正在尝试构建一个Conduit,它接收ByteStrings 作为输入(每个块大小约为1kb)并产生连接ByteStrings 的512kb 块作为输出。

这看起来应该很简单,但我遇到了很多麻烦,我尝试使用的大多数策略都只成功地将块分成更小的块,我没有成功连接更大的块。

我开始尝试isolate,然后是takeExactlyE,最后是conduitVector,但无济于事。最终我决定这样做:

import qualified Data.Conduit               as C
import qualified Data.Conduit.Combinators   as C
import qualified Data.ByteString            as B
import qualified Data.ByteString.Lazy       as BL

chunksOfAtLeast :: Monad m => Int -> C.Conduit B.ByteString m BL.ByteString
chunksOfAtLeast chunkSize = loop BL.empty chunkSize
  where 
    loop buffer n = do
      mchunk <- C.await
      case mchunk of 
        Nothing -> 
          -- Yield last remaining bytes
          when (n < chunkSize) (C.yield buffer)
        Just chunk -> do
          -- Yield when the buffer has been filled and start over
          let buffer' = buffer <> BL.fromStrict chunk
              l       = B.length chunk
          if n <= l
          then C.yield buffer' >> loop BL.empty chunkSize
          else loop buffer' (n - l)

P.S.我决定不为这个函数拆分更大的块,但这只是一个方便的简化。

但是,考虑到所有处理分块的管道函数,这似乎非常冗长[1,2,3,4]。请帮忙!使用组合器肯定有更好的方法来做到这一点,但我缺少一些直觉!

P.P.S.可以像我所做的那样对缓冲区使用惰性字节串吗?我有点不清楚字节串的内部表示以及这是否会有所帮助,特别是因为我使用的是BL.length,我猜它可能会评估thunk?


结论

为了详细说明迈克尔的答案和 cmets,我最终得到了这个管道:

import qualified Data.Conduit               as C
import qualified Data.Conduit.Combinators   as C
import qualified Data.ByteString            as B
import qualified Data.ByteString.Lazy       as BL

-- | "Strict" rechunk of a chunked conduit
chunksOfE' :: (MonadBase base m, PrimMonad base) 
         => Int 
         -> C.Conduit ByteString m ByteString
chunksOfE' chunkSize = C.vectorBuilder chunkSize C.mapM_E =$= C.map fromByteVector

我的理解是vectorBuilder 将支付早期连接较小块的成本,将聚合块生成为严格的字节串。

据我所知,当聚合块非常大和/或馈送到自然流接口时,可能需要一种生成惰性字节串块(即 “chunked chunks”)的替代实现像网络套接字。这是我对“惰性字节串”版本的最佳尝试:

import qualified Data.Sequences.Lazy        as SL
import qualified Data.Sequences             as S
import qualified Data.Conduit.List          as CL

-- | "Lazy" rechunk of a chunked conduit
chunksOfE :: (Monad m, SL.LazySequence lazy strict)
          => S.Index lazy
          -> C.Conduit strict m lazy
chunksOfE chunkSize = CL.sequence C.sinkLazy =$= C.takeE chunkSize

【问题讨论】:

  • 另外,令人难以置信的是,我还没有找到一个只做这个的例子......

标签: haskell chunking conduit


【解决方案1】:

这个怎么样?

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
import ClassyPrelude.Conduit

chunksOfAtLeast :: Monad m => Int -> Conduit ByteString m LByteString
chunksOfAtLeast chunkSize =
    loop
  where
    loop = do
        lbs <- takeCE chunkSize =$= sinkLazy
        unless (null lbs) $ do
            yield lbs
            loop

main :: IO ()
main =
    yieldMany ["hello", "there", "world!"]
        $$ chunksOfAtLeast 3
        =$ mapM_C print

根据您的目标,您可以采取许多其他方法。如果你想有一个严格的缓冲区,那么使用 vectorBuilder 的 blaze-builder 会很有意义。但这会保留您已经拥有的相同类型签名。

【讨论】:

  • 谢谢!!我认为多种方法是真正让我感到困惑的事情之一。我希望像C.takeExactlyE chunkSize C.concat 这样的东西可以工作,但我仍然不确定我是否理解它为什么不...?
  • 哦,或者更确切地说,我觉得我应该能够写出与forever $ C.takeExactlyE chunkSize C.concat 非常相似的东西,只是forever 不会终止……
  • 我认为你自己回答了你的forever 点:forever 永远不会终止。关于takeExactlyE vs takeE:唯一的区别是当下游没有完全消耗输入,但sinkLazy 确实总是消耗它的所有输入,所以两者在这里可以互换使用. fold 不是concat,而是您要查找的组合器,它只会将输出类型从LByteString 更改为ByteString
  • 关于fold:我可以想到三种不同的方式将ByteStrings: 聚合成一个懒惰的ByteString,将它们折叠在一起,或者转换为Builder 并连接Builder。它们都是有效的,这取决于你想对输出做什么。当然可以将某种组合器放在一起,例如takeForever。您还提到了使用conduitVector的可能性。
  • vectorBuilder 上的另一个版本可能会更快一些:lpaste.net/8310511707713699840
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-10-26
  • 1970-01-01
  • 1970-01-01
  • 2016-03-04
  • 1970-01-01
  • 2017-02-09
相关资源
最近更新 更多