【问题标题】:haskell pipes - how to repeatedly perform a takeWhile operation on a bytestring pipe?haskell 管道 - 如何在字节串管道上重复执行 takeWhile 操作?
【发布时间】:2016-06-04 15:37:09
【问题描述】:

我想要做的是使用 takeWhile 将一个字节串分割成某个字符。

import qualified Data.ByteString.Internal as BS (c2w, w2c)
import Pipes
import Pipes.ByteString as PB
import Pipes.GZip
import Pipes.Prelude as PP
import System.IO

newline = BS.c2w '\n'

splitter = PB.takeWhile (\myWord -> myWord /= newline)

myPipe fileHandle = PP.toListM $ decompress fileProducer >-> splitter
  where
    fileProducer = PB.fromHandle fileHandle       

run = do
  dat <- withFile "somefile.blob" ReadMode myPipe
  pure dat

这让我获得了第一行,但我真正想要的是一次有效地将每个块产生一个换行符。我该怎么做?

【问题讨论】:

标签: haskell streaming haskell-pipes


【解决方案1】:

@Michael 的回答很好。我只是想说明一些 正在发生的使用模式。

(.lhs 可在http://lpaste.net/165352 获得)

前几个导入:

 {-# LANGUAGE OverloadedStrings, NoMonomorphismRestriction #-}

 import Pipes
 import qualified Pipes.Prelude as PP
 import qualified Pipes.Group as PG
 import qualified Pipes.ByteString as PB
 import qualified Pipes.GZip as GZip
 import qualified Data.ByteString as BS
 import Lens.Family (view, over)
 import Control.Monad
 import System.IO

如果您查看 Pipes.ByteString 和 Pipes.GZip 中的函数 你会看到它们都变成了以下类型的模式:

  1. 制片人...-> FreeT(制片人...)...
  2. FreeT(生产者 ...)... -> 生产者 ...
  3. Lens'(制片人...)(FreeT(制片人...)...)
  4. 制片人...->制片人...

每个类别的功能示例:

  1. PB.words
  2. PG.concats
  3. PB.lines, PB.chunksOf, PB.splits, ...
  4. GZip.compress, GZip.decompress

以下是如何使用PB.words 将输入流拆分为单词:

 prod = yield "this is\na test\nof the pipes\nprocessing\nsystem"

 t1 = runEffect $ (PG.concats . PB.words) prod >-> PP.print

使用类型 3 的函数——例如PB.lines,只需使用view Lens' 获取类型 1 的函数,然后与 PG.concats 组合:

 t2a = runEffect $ (PG.concats . view PB.lines) prod >-> PP.print

 t2b h = (PG.concats . view PB.lines) (PB.fromHandle h) >-> PP.print

 run2 = withFile "input" ReadMode (runEffect . t2b)

对于 Producer -> Producer 函数,只需使用普通函数应用即可:

 t3 h = GZip.decompress (PB.fromHandle h) >-> PP.print

 run3 = withFile "input.gz" ReadMode (runEffect . t3)

 t4 h = GZip.decompress (PB.fromHandle h) >-> PP.map BS.length >-> PP.print

 run4 = withFile "big.gz" ReadMode (runEffect . t4)

要先解压再按行分割,我们嵌套函数 应用:

 t5 h = (PG.concats . view PB.lines) ( GZip.decompress (PB.fromHandle h) )
          >-> PP.map BS.length >-> PP.print

 run5 = withFile "input.gz" ReadMode (runEffect . t5)

【讨论】:

  • 感谢您的解释。为什么某些功能作为镜头视图提供而不是直接提供转换功能?我知道镜头提供了更多的通用性,但除此之外,设计并不明显。镜片在黑线鳕中被描述为“不合适”是什么意思?
  • 这些是stackoverflow.com/users/1026598/gabriel-gonzalez 的好问题!另一个专门询问管道问题的地方是pipes mailing list
【解决方案2】:

pipes-bytestringpipes-group 的排列方式使得重复破坏 Producer ByteString m r 会产生 FreeT (Producer ByteString m) m rFreeT 在这里可以理解为 A_Succession_Of,因此结果可以被认为是“一系列返回 r 的字节串生产者段”。这样,如果其中一个段的长度为 10 GB,we still have streaming rather than a 10 gigabyte strict bytestring

在我看来,您想在换行符上打破字节串生产者,但我不知道您是否想保留换行符。如果您将它们扔掉,这与使用view PB.lines 拆分字节串生产者相同,然后将每个从属生产者连接成一个严格的字节串 - 单独的行。我在下面写了这个accumLines。这很简单,但是稍微使用了Lens.view 将花哨的PB.lines 镜头变成了常规功能。 (许多操作在pipes-bytestring 中被写成镜头,因为这样它们可以被重新用于其他目的,尤其是那种解析pipes 的生产者。)

import Pipes
import qualified Pipes.Prelude as P
import Pipes.ByteString as PB
import qualified Pipes.Group as PG
import Pipes.GZip

import qualified Data.ByteString.Internal as BS (c2w, w2c)

import System.IO
import Lens.Simple (view) -- or Control.Lens or whatever
import Data.Monoid

main = run >>= mapM_ print

myPipe fileHandle = P.toListM $ accumLines (decompress fileProducer)
  where
    fileProducer = PB.fromHandle fileHandle

run = do
  dat <- withFile "a.gz" ReadMode myPipe
  pure dat

-- little library additions

accumLines :: Monad m => Producer ByteString m r -> Producer ByteString m r
accumLines = mconcats . view PB.lines 

accumSplits :: Monad m => Char -> Producer ByteString m r -> Producer ByteString m r
accumSplits c  = mconcats . view (PB.splits (BS.c2w c)) 

-- this is convenient, but the operations above could 
-- be more rationally implemented using e.g. BL.fromChunks and toListM 
mconcats :: (Monad m, Monoid b) => FreeT (Producer b m) m r -> Producer b m r
mconcats = PG.folds (<>) mempty id

理想情况下,您不会在每个换行符处编写新的字节串。是否必须取决于您要对这些线条做什么。

【讨论】:

  • PG.concats 和你的mconcats 一样吗?
  • 不,PG.concats 只是删除了连续中的FreeT 中断,因此它没有幺半群约束。与concat 的类比遵循一般的类比FreeT (Producer m) m r : Producer a m r :: [[a]] : [a] 我的小mconcats 将每个连续的monoidal 值生成器压缩为一个汇总的monoidal 值——也就是说,它执行了许多类似于mconcat 的小操作。也许这不是最好的名字。
  • 我在解析器(可能是 attoparsec 或 megaparsec)中每 n 行运行一次以构建记录。似乎我必须构建单独的字节串(为此,每条记录至少一个)。还是有办法避免这种情况?
  • 如果您编写了一个 attoparsec 解析器,它在给定的 n 行段上成功并返回一条记录,那么您可以重复地将其直接应用于字节串生产者,而无需将其分成几行。见Pipes.Attoparsec.decoded。因此,parsed my_n_line_parser . decompress 将流式传输来自压缩文件的记录;如果失败,它将返回带有消息的字节串的其余(解压缩)生产者。如果它适合任务,这可能是最简单的。
猜你喜欢
  • 2014-11-16
  • 2023-02-22
  • 2021-12-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-10-04
相关资源
最近更新 更多