【问题标题】:Haskell / Conduit: read file line by lineHaskell / Conduit:逐行读取文件
【发布时间】:2019-06-05 11:21:36
【问题描述】:

场景:我有一个约 900mb 的文本文件,其格式如下

...
Id:   109101
ASIN: 0806978473
  title: The Beginner's Guide to Tai Chi
  group: Book
  salesrank: 672264
  similar: 0
  categories: 3
   |Books[283155]|Subjects[1000]|Sports[26]|Individual Sports[16533]|Martial Arts[16571]|General[16575]
   |Books[283155]|Subjects[1000]|Sports[26]|Individual Sports[16533]|Martial Arts[16571]|Taichi[16583]
   |Books[283155]|Subjects[1000]|Sports[26]|General[11086921]
  reviews: total: 2  downloaded: 2  avg rating: 5
    2000-4-4  cutomer: A191SV1V1MK490  rating: 5  votes:   0  helpful:   0
    2004-7-10  cutomer:  AVXBUEPNVLZVC  rating: 5  votes:   0  helpful:   0
                    (----- empty line ------)    
Id :

并想从中解析信息。

问题:作为第一步(因为我需要它用于另一个上下文)我想逐行处理文件,然后将属于一个产品的“块”收集在一起,然后处理它们与其他逻辑分开。

所以计划如下:

  1. 定义代表文本文件的源
  2. 定义一个管道 (?),每个管道从该源取一条线,然后...
  3. ... 将其传递给其他一些组件。

现在,我正在尝试修改以下示例:

doStuff = do
  writeFile "input.txt" "This is a \n test." -- Filepath -> String -> IO ()

  runConduitRes                  -- m r
    $ sourceFileBS "input.txt"   -- ConduitT i ByteString m ()  -- by "chunk"
    .| sinkFile "output.txt"     -- FilePath -> ConduitT ByteString o m ()

  readFile "output.txt"
    >>= putStrLn 

所以sourceFileBS "input.txt"ConduitT i ByteString m () 类型,即具有

  • 输入类型i
  • 输出类型ByteStream
  • 单子类型t
  • 结果类型()

sinkFile 将所有传入数据流式传输到给定文件中。 sinkFile "output.txt" 是输入类型为ByteStream 的管道。

我现在想要的是逐行处理输入源,即每个下游只传递一行。在伪代码中:

sourceFile "input.txt"
splitIntoLines
yieldMany (?)
other stuff

我该怎么做?

我目前拥有的是

copyFile = do
  writeFile "input.txt" "This is a \n test." -- Filepath -> String -> IO ()

  runConduitRes                  -- m r
    (lineC $ sourceFileBS "input.txt")   -- ConduitT i ByteString m ()  -- by "chunk"
    .| sinkFile "output.txt"     -- FilePath -> ConduitT ByteString o m ()

  readFile "output.txt"
    >>= putStrLn --

但这会产生以下类型错误:

    * Couldn't match type `bytestring-0.10.8.2:Data.ByteString.Internal.ByteString'
                     with `Void'
      Expected type: ConduitT
                       ()
                       Void
                       (ResourceT
                          (ConduitT
                             a0 bytestring-0.10.8.2:Data.ByteString.Internal.ByteString m0))
                       ()
        Actual type: ConduitT
                       ()
                       bytestring-0.10.8.2:Data.ByteString.Internal.ByteString
                       (ResourceT
                          (ConduitT
                             a0 bytestring-0.10.8.2:Data.ByteString.Internal.ByteString m0))
                       ()
    * In the first argument of `runConduitRes', namely
        `(lineC $ sourceFileBS "input.txt")'
      In the first argument of `(.|)', namely
        `runConduitRes (lineC $ sourceFileBS "input.txt")'
      In a stmt of a 'do' block:
        runConduitRes (lineC $ sourceFileBS "input.txt")
          .| sinkFile "output.txt"
   |
28 |     (lineC $ sourceFileBS "input.txt")   -- ConduitT i ByteString m ()  -- by "chunk"
   |      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

这让我相信现在的问题是第一个管道没有与runConduitRes兼容的输入类型。

我只是无法理解它,真的需要一个提示。

非常感谢。

【问题讨论】:

  • 提示:不要只是告诉我们您遇到了类型错误。报告类型错误。请在上面的部分中,而不是 cmets 部分。
  • 当然。我编辑了帖子。
  • 我不确定,但请尝试 runConduitRes $ (lineC $ ...) .| ...。否则,您将两个参数传递给runConduitRes,第一个是函数lineC
  • 谢谢,这看起来很合理,但还没有完全解决我的问题。我已经编辑了我的帖子。
  • @ngmir 在您编辑的示例中,您有runConduitRes (lineC $ sourceFileBS "input.txt") .| sinkFile "output.txt"。我认为您可能在runConduitRes 之后缺少$:否则您将尝试运行(lineC $ sourceFileBS "input.txt") 而不是整个管道。

标签: haskell conduit


【解决方案1】:

我今天为此苦苦挣扎,并在试图找出类似问题时发现了这个问题。我试图将 git 日志分成块以进行进一步解析,例如

commit 12345
Author: Me
Date:   Thu Jan 25 13:45:16 2019 -0500

    made some changes

 1 file changed, 10 insertions(+), 0 deletions(-)

commit 54321
Author: Me
...and so on...

我需要的函数几乎是来自Data.Conduit.CombinatorssplitOnUnBounded,但我不太清楚如何在那里编写谓词函数。

我想出了以下Conduit,它是对splitOnUnbounded 的轻微修改。 source 它将采用一系列列表。每个列表只有一行文本,因为我发现这样考虑更容易一些,尽管这肯定不是最佳解决方案。

它将使用一个函数将文本行组合在一起,该函数采用 next 行并返回 Bool 指示下一行是否是下一组文本的开始。


groupLines :: (Monad m, MonadIO m) => (Text -> Bool) -> [T.Text] -> ConduitM Text [Text] m ()
groupLines startNextLine ls = start
  where
    -- If the next line in the stream is Nothing, return.
    -- If the next line is the stream is Just line, then
    --   accumulate that line
    start = await >>= maybe (return ()) (accumulateLines ls)
    accumulateLines ls nextLine = do
      -- if ls is [], then add nextLine. Try to get a new next line. If there isn't one, yield. If there is a next line,
      --     yield lines and call accumulatelines again.
      -- if ls is [Text], check if nextLine is the start of the next group. If it isn't, add nextLine to ls,
      --    try got the the next nextLine. if there isn't one, yield, and if there is one, call accumulate lines again.
      --    If nextLine _is_ the start of the next group, the yield this group of lines and call accumulate lines again.
      nextLine' <- await
      case nextLine' of
        Nothing -> yield ls'
        Just l ->
          if Prelude.null ls
            then accumulateLines ls' l
            else
              if startNextLine l
                then yield ls' >> accumulateLines [] l
                else accumulateLines ls' l
      where
        ls' = ls ++ [nextLine]

它可以用在像下面这样的管道中。只需在 Text -&gt; Bool 函数上方传递函数,该函数告诉管道何时开始下一个文本集合。


isCommitLine :: Text -> Bool
isCommitLine t = listToMaybe (TS.indices "commit" t) == Just 0

logParser =
  sourceFile "logs.txt"
    .| decodeUtf8
    .| linesUnbounded
    .| groupLines isCommitLine []
    .| Data.Conduit.Combinators.map (intercalate "\n")
    -- do something with each log entry here --
    .| Data.Conduit.Combinators.print

main :: IO ()
main = runConduitRes logParser

我是 Haskell 的新手,我强烈怀疑这不是完成此任务的最佳方式。所以如果其他人有更好的建议,我会很乐意学习!否则,也许在这里发布这个解决方案会帮助别人。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-06-16
    • 1970-01-01
    • 2014-06-21
    相关资源
    最近更新 更多