【问题标题】:Haskell Conduit Aeson: Parsing Large JSONs and filter matching key/valuesHaskell Conduit Aeson:解析大型 JSON 并过滤匹配的键/值
【发布时间】:2018-02-28 06:42:57
【问题描述】:

我在 Haskell 中编写了一个应用程序,它执行以下操作:

  1. 递归列出目录,
  2. 从目录列表中解析 JSON 文件,
  3. 寻找匹配的键值对,并且
  4. 返回找到匹配项的文件名。

我的这个应用程序的第一个版本是我能写的最简单、幼稚的版本,但我注意到空间使用量似乎单调增加。

因此,我切换到conduit,现在我的主要功能如下所示:

conduitFilesFilter :: ProjectFilter -> Path Abs Dir -> IO [Path Abs File]
conduitFilesFilter projFilter dirname' = do
  (_, allFiles) <- listDirRecur dirname'
  C.runConduit $
    C.yieldMany allFiles
    .| C.filterMC (filterMatchingFile projFilter)
    .| C.sinkList

现在我的应用程序的内存使用量有限,但仍然很慢。对此,我有两个问题。

1)

我使用stack new 生成骨架来创建这个应用程序,它默认使用ghc 选项-threaded -rtsopts -with-rtsopts=-N

(对我而言)令人惊讶的是,当我实际运行该应用程序时,它使用了所有可用的处理器(目标机器中大约有 40 个)。但是,我没有编写要并行运行的应用程序的任何部分(实际上我考虑过)。

什么是并行运行的?

2)

此外,大多数 JSON 文件都非常大(10mb),可能有 500k 需要遍历。这意味着由于所有 Aeson 解码,我的程序非常慢。我的想法是并行运行我的filterMatchingFile 部分,但是查看stm-conduit 库,我看不到在少数处理器上并行运行此中间操作的明显方法。

谁能建议一种使用stm-conduit 或其他方式巧妙地并行化我上面的函数的方法?


编辑

我意识到我可以将我的readFile -&gt; decodeObject -&gt; runFilterFunction 分解为conduit 的单独部分,然后我可以在有界通道中使用stm-conduit。也许我会试一试...


我使用+RTS -s 运行我的应用程序(我将其重新配置为-N4),我看到以下内容:

 115,961,554,600 bytes allocated in the heap
  35,870,639,768 bytes copied during GC
      56,467,720 bytes maximum residency (681 sample(s))
       1,283,008 bytes maximum slop
             145 MB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0     108716 colls, 108716 par   76.915s  20.571s     0.0002s    0.0266s
  Gen  1       681 colls,   680 par    0.530s   0.147s     0.0002s    0.0009s

  Parallel GC work balance: 14.99% (serial 0%, perfect 100%)

  TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.001s  (  0.007s elapsed)
  MUT     time   34.813s  ( 42.938s elapsed)
  GC      time   77.445s  ( 20.718s elapsed)
  EXIT    time    0.000s  (  0.010s elapsed)
  Total   time  112.260s  ( 63.672s elapsed)

  Alloc rate    3,330,960,996 bytes per MUT second

  Productivity  31.0% of total user, 67.5% of total elapsed

gc_alloc_block_sync: 188614
whitehole_spin: 0
gen[0].sync: 33
gen[1].sync: 811204

【问题讨论】:

  • 1) 几乎可以肯定没有。您需要明确并行性(Haskell 只是让您的代码更容易并行化 - 它不适合您)。
  • 嗨@Alec。这正是我的想法,但是当我查看 htop 中的进程时,子进程与 CPU 一样多,并且每个子进程似乎都在做某事(主要是“S”)。这就是提出这个问题的原因。它只是在做并行 GC(这是一回事吗?)?
  • .| 替换为buffer',将runConduit 替换为runCConduit
  • @erewok 并行 GC 绝对是一回事。 (它甚至在你的统计数据中就这么说!)我不是 100% 肯定,但 RTS 可能也在为 I/O 管理做一些线程。我怀疑您的大部分实际用户代码是并行的。
  • @MathematicalOrchid 感谢您的解释。这就是我认为可能会发生的事情。

标签: json haskell aeson conduit


【解决方案1】:

从您的程序描述来看,它没有理由增加内存使用量。我认为这是由于错过了惰性计算而导致的意外内存泄漏。这可以通过堆分析轻松检测到:https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/profiling.html#hp2ps-rendering-heap-profiles-to-postscript。其他可能的原因是运行时不会将所有内存释放回操作系统。直到某个阈值,它将保持与处理的最大文件成比例的内存。如果通过进程 RSS 大小进行跟踪,这可能看起来像是内存泄漏。

-A32m 选项增加托儿所的大小。它允许您的程序在触发垃圾收集之前分配更多内存。统计数据显示,在 GC 期间保留的内存非常少,因此它发生的频率较低,程序花在实际工作上的时间更多。

【讨论】:

    【解决方案2】:

    在 Haskell Cafe 的 Michael Snoyman 的提示下,他指出我的第一个版本并没有真正利用 Conduit 的流媒体功能,我重写了我的 Conduit 版本的应用程序(不使用 stm-conduit)。这是一个很大的改进:我的第一个 Conduit 版本正在对所有数据进行操作,而我没有意识到这一点。

    我还增加了托儿所的大小,这通过减少垃圾收集的频率提高了我的工作效率。

    我修改后的函数最终看起来像这样:

    module Search where
    
    import           Conduit               ((.|))
    import qualified Conduit               as C
    import           Control.Monad
    import           Control.Monad.IO.Class   (MonadIO, liftIO)
    import           Control.Monad.Trans.Resource (MonadResource)
    import qualified Data.ByteString       as B
    import           Data.List             (isPrefixOf)
    import           Data.Maybe            (fromJust, isJust)
    import           System.Path.NameManip (guess_dotdot, absolute_path)
    import           System.FilePath       (addTrailingPathSeparator, normalise)
    import           System.Directory      (getHomeDirectory)
    
    import           Filters
    
    
    sourceFilesFilter :: (MonadResource m, MonadIO m) => ProjectFilter -> FilePath -> C.ConduitM () String m ()
    sourceFilesFilter projFilter dirname' =
        C.sourceDirectoryDeep False dirname'
        .| parseProject projFilter
    
    parseProject :: (MonadResource m, MonadIO m) => ProjectFilter -> C.ConduitM FilePath String m ()
    parseProject (ProjectFilter filterFunc) = do
      C.awaitForever go
      where
        go path' = do
          bytes <- liftIO $ B.readFile path'
          let isProj = validProject bytes
          when (isJust isProj) $ do
            let proj' = fromJust isProj
            when (filterFunc proj') $ C.yield path'
    

    我的主要只是运行管道并打印通过过滤器的管道:

    mainStreamingConduit :: IO ()
    mainStreamingConduit = do
      options <- getRecord "Search JSON Files"
      let filterFunc = makeProjectFilter options
      searchDir <- absolutize (searchPath options)
      itExists <- doesDirectoryExist searchDir
      case itExists of
        False -> putStrLn "Search Directory does not exist" >> exitWith (ExitFailure 1)
        True -> C.runConduitRes $ sourceFilesFilter filterFunc searchDir .| C.mapM_ (liftIO . putStrLn)
    

    我是这样运行的(通常没有统计信息):

    stack exec search-json -- --searchPath $FILES --name NAME +RTS -s -A32m -n4m
    

    在不增加托儿所规模的情况下,我的生产率提高了 30% 左右。然而,有了上面的内容,它看起来像这样:

      72,308,248,744 bytes allocated in the heap
         733,911,752 bytes copied during GC
           7,410,520 bytes maximum residency (8 sample(s))
             863,480 bytes maximum slop
                 187 MB total memory in use (27 MB lost due to fragmentation)
    
                                         Tot time (elapsed)  Avg pause  Max pause
      Gen  0       580 colls,   580 par    2.731s   0.772s     0.0013s    0.0105s
      Gen  1         8 colls,     7 par    0.163s   0.044s     0.0055s    0.0109s
    
      Parallel GC work balance: 35.12% (serial 0%, perfect 100%)
    
      TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)
    
      SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)
    
      INIT    time    0.001s  (  0.006s elapsed)
      MUT     time   26.155s  ( 31.602s elapsed)
      GC      time    2.894s  (  0.816s elapsed)
      EXIT    time   -0.003s  (  0.008s elapsed)
      Total   time   29.048s  ( 32.432s elapsed)
    
      Alloc rate    2,764,643,665 bytes per MUT second
    
      Productivity  90.0% of total user, 97.5% of total elapsed
    
    gc_alloc_block_sync: 3494
    whitehole_spin: 0
    gen[0].sync: 15527
    gen[1].sync: 177
    

    我仍然想弄清楚如何并行化 filterProj . parseJson . readFile 部分,但现在我对此感到满意。

    【讨论】:

      【解决方案3】:

      我想出了如何使用 stm-conduit 运行这个应用程序,并得到了 Haskell wiki 关于并行性的一些帮助,还有一个 Stack Overflow answer 谈到了在 main 退出之前等待线程结束。

      它的工作方式是我创建一个通道来保存所有要操作的文件名。然后,我分叉了一堆线程,每个线程运行一个Conduit,文件路径通道为Source。我跟踪所有子线程并等待它们完成。

      也许这个解决方案对其他人有用?

      并不是我所有的低级过滤器函数都存在,但它的要点是我有一个 Conduit 来测试一些 JSON。如果它通过,那么它yields 和FilePath

      这是我的主要内容:

      {-# LANGUAGE DeriveGeneric     #-}
      {-# LANGUAGE OverloadedStrings #-}
      
      
      module Main where
      
      import           Conduit                      ((.|))
      import qualified Conduit                      as C
      import           Control.Concurrent
      import           Control.Monad                (forM_)
      import           Control.Monad.IO.Class       (liftIO)
      import           Control.Concurrent.STM
      import           Control.Monad.Trans.Resource (register)
      
      import qualified Data.Conduit.TMChan          as STMChan
      import           Data.Maybe                   (isJust, fromJust)
      import qualified Data.Text                    as T
      import           Options.Generic
      import           System.Directory            (doesDirectoryExist)
      import           System.Exit
      
      import           Search
      
      
      data Commands =
        Commands { searchPath  :: String
                 , par         :: Maybe Int
                 , project     :: Maybe T.Text
                 , revision    :: Maybe T.Text
                 } deriving (Generic, Show)
      
      instance ParseRecord Commands
      
      makeProjectFilter :: Commands -> ProjectFilter
      makeProjectFilter options =
        let stdFilts = StdProjectFilters
              (ProjName <$> project options)
              (Revision <$> revision options)
        in makeProjectFilters stdFilts
      
      main :: IO ()
      main = do
        options <- getRecord "Search JSON Files"
        -- Would user like to run in parallel?
        let runner = if isJust $ par options
              then mainSTMConduit (fromJust $ par options)
              else mainStreamingConduit
      
        -- necessary things to search files: search path, filters to use, search dir exists
        let filterFunc = makeProjectFilter options
        searchDir <- absolutize (searchPath options)
        itExists <- doesDirectoryExist searchDir
      
        -- Run it if it exists
        case itExists of
          False -> putStrLn "Search Directory does not exist" >> exitWith (ExitFailure 1)
          True -> runner filterFunc searchDir
      
      -- Single-threaded version with bounded memory usage
      mainStreamingConduit :: ProjectFilter -> FilePath -> IO ()
      mainStreamingConduit filterFunc searchDir = do
        C.runConduitRes $
          sourceFilesFilter filterFunc searchDir .| C.mapM_C (liftIO . putStrLn)
      
      -- Multiple-threaded version of this program using channels from `stm-conduit`
      mainSTMConduit :: Int -> ProjectFilter -> FilePath -> IO ()
      mainSTMConduit nrWorkers filterFunc searchDir = do
        children <- newMVar []
        inChan <- atomically $ STMChan.newTBMChan 16
        _ <- forkIO . C.runResourceT $ do
               _ <- register $ atomically $ STMChan.closeTBMChan inChan
               C.runConduitRes $ C.sourceDirectoryDeep False searchDir .| STMChan.sinkTBMChan inChan True
        forM_ [1..nrWorkers] (\_ -> forkChild children $ runConduitChan inChan filterFunc)
        waitForChildren children
        return ()
      
      
      runConduitChan :: STMChan.TBMChan FilePath -> ProjectFilter -> IO ()
      runConduitChan inChan filterFunc = do
        C.runConduitRes $
             STMChan.sourceTBMChan inChan
             .| parseProject filterFunc
             .| C.mapM_C (liftIO . putStrLn)
      
      waitForChildren :: MVar [MVar ()] -> IO ()
      waitForChildren children = do
        cs <- takeMVar children
        case cs of
          []   -> return ()
          m:ms -> do
            putMVar children ms
            takeMVar m
            waitForChildren children
      
      forkChild :: MVar [MVar ()] -> IO () -> IO ThreadId
      forkChild children io = do
        mvar <- newEmptyMVar
        childs <- takeMVar children
        putMVar children (mvar:childs)
        forkFinally io (\_ -> putMVar mvar ())
      

      注意:我将stm-conduit 3.0.0conduit 1.12.1 一起使用,这就是我需要包含布尔参数的原因:

      STMChan.sinkTBMChan inChan True
      

      stm-conduit 的版本4.0.0 中,此函数自动关闭通道,因此布尔参数已被删除。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2016-06-27
        相关资源
        最近更新 更多