【问题标题】:Batching data before passing through the pipeline在通过管道之前对数据进行批处理
【发布时间】:2020-08-03 14:11:46
【问题描述】:

我有一堆文件共享,上面有数百万个文件/文件夹。我正在使用gci -Recurse 获取共享上的目录/文件的完整列表,并且我需要将该 gci 中的几条信息加载到 SQL 服务器中以进行额外分析。我用来获取数据的命令是:

gci $SharePath -Recurse | select FullName, Attributes, Length, CreationTimeUtc, LastAccessTimeUtc, LasWriteTimeUtc

现在我可以按照Microsoft's Write-SqlTableData documentation page 上的选项 3 中的建议,使用推荐的语法将其通过管道传输到 Write-SQLTableData,以强制批量插入,如下所示:

$Params = @{
    ServerInstance = 'sqlservername'
    DatabaseName = 'databasename'
    SchemaName = 'dbo'
}
,(gci $SharePath -Recurse | select FullName, Attributes, Length, CreationTimeUtc, LastAccessTimeUtc, LasWriteTimeUtc) | Write-SqlTableData @Params -TableName 'Table1'

然而,这样做的结果是 gci 需要几个小时才能完成,而没有任何反馈,并且在最终将所有数据转储到 SQL 之前,会耗尽许多 GB 的内存并让我的机器慢下来。如果我不使用 ,( 和匹配的 ),数据会在生成时移至 SQL,但是 SQL 服务器会被数百万个单独的插入操作。

我正在寻找的是使用管道的中间答案。我知道我可以将 gci 结果存储在变量 $gciresults 中,然后使用 $gciresults[0..999] 一次将 1000 行传递给 SQL 等等,但我正在尝试利用管道,所以我不会占用太多内存.理想情况下,我会调用一些 cmdlet batching-cmdlet,它允许我将传入的数据拆分成小块,而无需先将其全部存储在内存中,如下所示:

gci ... | select FullName, ... | batching-cmdlet -batchsize 1000 | Write-SqlTableData @Params -TableName 'Table1'

对此类 cmdlet 的搜索不成功。有没有人想过我可以如何做到这一点?

【问题讨论】:

    标签: powershell tsql chunking batching


    【解决方案1】:

    使用@mklement0 在他接受的答案中概述的框架,我编写了以下Split-PipelineData cmdlet,它接受管道输入并以用户可定义的批次向下传递。请注意,这与@mklement0 链接的帖子中的功能非常相似,但是我还添加了使用write-progress 报告进度的功能。

    <#
    .Synopsis
        Takes pipeline objects one at a time and sends them on in batches.
    .DESCRIPTION
        Takes pipeline objects one at a time and sends them on in batches.  Allows user selectable values for
        batch size and feedback options.
    #>
    Function Split-PipelineData
    {
        [CmdletBinding(DefaultParameterSetName='Default')]
        Param
        (
            # PipelineData
            [Alias('PipelineData')]
            [Parameter(ParameterSetName='Default',Mandatory=$true,ValueFromPipeline=$true,Position=0)]
            [Parameter(ParameterSetName='Progress',Mandatory=$true,ValueFromPipeline=$true,Position=0)]
            $InputObject,
    
            # Batch size for sending on to the pipeline
            [Parameter(ParameterSetName='Default',Mandatory=$false)]
            [Parameter(ParameterSetName='Progress',Mandatory=$false)]
            [int]$BatchSize=1000,
    
            # If set, Progress will use Write-Progress to display progress information
            [Parameter(ParameterSetName='Progress',Mandatory=$true)]
            [switch]$Progress,
    
            # Passthru to Write-Progress ID parameter
            [Parameter(ParameterSetName='Progress',Mandatory=$false)]
            [int]$ProgressID=0,
    
            # Passthru to Write-Progress ParentID parameter
            [Parameter(ParameterSetName='Progress',Mandatory=$false)]
            [int]$ProgressParentID=-1,
    
            # Passthru to Write-Progress Activity parameter. Default is 'Batching pipeline data'.
            [Parameter(ParameterSetName='Progress',Mandatory=$false)]
            [int]$ProgressActivity=$null,
    
            # Report progress after this many records.  Defaults to same as BatchSize
            [Parameter(ParameterSetName='Progress',Mandatory=$false)]
            [int]$ProgressBatchSize=$null,
    
            # Total Record count (if known) to be used in progress
            [Parameter(ParameterSetName='Progress',Mandatory=$false)]
            [int]$TotalRecords=$null
        )
    
        Begin
        {
            $Batch = [System.Collections.Generic.Queue[pscustomobject]]::new($BatchSize)
            [int64]$RecordCounter = 0
            If ($Progress)
            {
                $ProgressParams = @{
                    Activity = If ($ProgressActivity) {$ProgressActivity} Else {'Batching pipeline data'}
                    Status = ''
                    ID = $ProgressID
                    ParentID = $ProgressParentID
                    PercentComplete = -1
                }
                If ($ProgressBatchSize -in $null,0) {$ProgressBatchSize = $BatchSize}
            }
        }
        Process
        {
            $RecordCounter++
    
            #Add record to batch
            $Batch.Enqueue($_)
    
            #Report progress if necessary
            If ($Progress -and $RecordCounter % $ProgressBatchSize-eq 0)
            {
                If ($TotalRecords)
                {
                    $ProgressParams.Status = "Piping record $RecordCounter/$TotalRecords"
                    $ProgressParams.PercentComplete = [int](100*($RecordCounter/$TotalRecords))
                }
                Else
                {
                    $ProgressParams.Status = "Piping record $RecordCounter"
                }
                Write-Progress @ProgressParams
            }
    
            #Pass batch on if it has reached its threshhold
            if ($Batch.Count -eq $BatchSize)
            { 
                ,($Batch)
                $Batch.Clear() # start next batch
            }
        }
        End
        {
            #Report final progress if necessary
            If ($Progress)
            {
                If ($TotalRecords)
                {
                    $ProgressParams.Status = "Piping record $RecordCounter/$TotalRecords"
                    $ProgressParams.PercentComplete = [int](100)
                }
                Else
                {
                    $ProgressParams.Status = "Piping record $RecordCounter"
                }
                Write-Progress @ProgressParams
            }
    
            #Pass remaining records on and clear variable
            ,($Batch)
            $Batch.Clear()
            Remove-Variable Batch
    
            #Clear progress bars if necessary
            If ($Progress)
            {
                $ProgressParams.Activity = 'Completed'
                If ($ProgressParams.ContainsKey('Status')) {$ProgressParams.Remove('Status')}
                Write-Progress @ProgressParams -Completed
            }
        }
    }
    

    【讨论】:

    • 是的,本质上这与我的链接答案中的Select-Chunk 函数非常相似,但进度条支持是一个很好的补充。两个小问题:我建议将 $PipelineData 重命名为 $InputObject 以与其他 cmdlet 保持一致;动词 Split 有点令人困惑,因为您正在做的是 batchingchunking;虽然没有合适的认可动词,但您可以使用名词部分,例如Select-Batch(不要认为您需要名称中的“管道”一词)。
    • 我拆分了差异并更改了参数,但出于我自己的目的为“PipelineData”添加了一个参数别名。在我确定拆分之前,我在动词上犹豫了很多次,因为我们并没有真正选择任何东西,我们正在拆分一个流。我同意Chunk-Stream 可能是一个更好的描述,但我知道我个人在拆分/分块时不会想到Select。同意不同意那个?
    • 我明白你的意思,这里选择的任何名称都是一种妥协;我们不需要就名称达成一致,但我会留给您这样的想法:您可以将Select-Chunk 视为“从整个输入对象集中选择输入对象块”。 Split-Chunk 颠倒了这个观点。
    【解决方案2】:

    遗憾的是,从 PowerShell 7.0 开始,没有批处理(分区)机制。

    因此,您现在必须自己实现批处理:

    # Create an aux. queue for batching the objects.
    $batchSize = 1000
    $batch = [System.Collections.Generic.Queue[pscustomobject]]::new($batchSize)
    
    Get-ChildItem $SharePath -Recurse | 
      Select-Object FullName, Attributes, Length, CreationTimeUtc, LastAccessTimeUtc, LasWriteTimeUtc |
        ForEach-Object { 
          $batch.Enqueue($_) # add object to the batch
          if ($batch.Count -eq $batchSize) { # batch is full, write to table.
            # send batch as a whole through the pipeline
            , $batch | Write-SqlTableData @Params -TableName Table1
            $batch.Clear() # start next batch
          }
        }
    
    # Process any remaining items.
    if ($batch.Count) {
      , $batch | Write-SqlTableData @Params -TableName Table1
    }
    

    【讨论】:

    • 这种结构正是我想要的,但我不知道 .NET Queue Collections 类型。我打算把它变成一个 cmdlet 并将它添加到我的永久工具包中。以这种方式运行代码使我能够每 1000 行获得一次反馈,以了解我在进程中的位置,并且启动时内存效率很高。非常感谢您抽出宝贵时间来做这件事@mklement0!
    猜你喜欢
    • 1970-01-01
    • 2017-01-23
    • 2018-02-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-06
    • 1970-01-01
    相关资源
    最近更新 更多