【问题标题】:SSIS Custom Data Flow Component loop input pipeline buffer more than onceSSIS自定义数据流组件循环输入管道缓冲区不止一次
【发布时间】:2020-09-11 13:24:57
【问题描述】:

我仍在学习如何在 SSIS 中创建自定义组件。假设我的输入行数为 500 行,我需要从输入管道缓冲区一次读取/批处理/处理 100 行,然后将它们发送到第 3 方应用程序,一旦我得到结果,我需要更新使用新数据的管道缓冲列,然后读取/批处理/处理接下来的 100 行,依此类推,直到我处理完所有 500 行。

我的问题是,我能否多次循环/读取输入管道缓冲区,以便使用来自 3rd 方应用程序的返回数据更新缓冲区?

我以为我读到您可以读入所有数据并将其存储到缓存中,然后例如对数据进行排序,但我不确定如何将这些数据从缓存中取回输出。我也不确定应该在哪里完成以及如何访问输入管道缓冲区、PrimeOutput 或 ProcessInput 或其他我不知道的覆盖方法?

我正在尝试创建一个自定义异步数据流组件来解决这个问题。

任何帮助或想法将不胜感激和/或为我指明正确的方向!

谢谢!

【问题讨论】:

  • 返回和更新行与 SSIS 管道的想法背道而驰。为什么不将这些阶段分解为管道,以便 SSIS 可以应用自己的缓冲逻辑?如果需要,您的组件可以一次处理 100 行,然后只输出该行加上新/更新的数据。
  • @JeroenMostert 感谢您的回复!我不确定我是否完全理解。所以我可以创建一个新的管道缓冲区?或者一次循环遍历 ProcessInput() 中的输入管道缓冲区 100 行,然后将该批次发送给第 3 方,然后将返回的结果放入单独的输出缓冲区中,这样我就不必循环回输入缓冲区?抱歉,我还是新手,正在努力掌握这一切。
  • 好吧,没关系。我对 SSIS 的编程模型感到困惑(我已经有一段时间没有看到它了)。当然,您不能只根据需要将行流式传输到输出,这将是简单直观的。 :-P
  • SSIS 中的同步组件是在源行和输出之间具有 1:1 映射的组件。异步是您定义的任何内容。默认排序组件具有 1:1,但输出数据位于与输入缓冲区不同的缓冲区中。听起来您的流程最多批处理 100 行,做工作,将这 100 行 + 工作扩充添加到输出缓冲区。我没有看到您将在源行上“循环”的位置。你能扩展一下这个想法吗?
  • @billinkc 是的,我有一个返回 500 行的 Ole DB 源。我需要一次批处理 100 行并将它们批量发送到 Smarty Streets 地址验证 API,然后我将返回返回结果。从这里我被难住了,循环的想法是通过输入管道缓冲区返回并使用返回数据进行更新。听起来我不需要重用输入管道缓冲区,那么我需要在哪里或做什么才能将这些数据发送到输出缓冲区?我可以在哪里或如何访问此缓冲区?至于我将在哪里循环将在 ProcessInput() 方法中。

标签: sql-server ssis


【解决方案1】:

我很高兴我没有尝试徒手写这个,因为我忘记了很多细节。

这里有几点值得注意:我的两个数据结构InDataOutData,您需要配置它们以跟踪输入/输出缓冲区中的内容。正如 cmets 所说,可能有一种巧妙的方法可以克隆 Buffer 对象的属性,但我不知道如何去做。定义这些以匹配数据流中的数据类型,如果您像我一样懒惰,请使用相同的列名,您可以复制/粘贴成功。

ApiCall 是一个虚拟方法,它使用我们的缓存值来请求数据清理服务执行它的操作。它需要返回清理后的数据,以便我们可以将输入和输出合并到一个统一的行中。可能有更好的方法来做到这一点,但希望它足以让您的思维过程启动。

我创建了一个 SSIS 级别变量 @[User::ApiBatchSize],您可以将其初始化为 500。使用这种方法可以优化发送的批量大小,而无需更改核心代码。我在 PreExecute 方法中初始化了我们的本地成员变量,因为那是脚本组件的构造函数。

通常,在异步脚本组件中,您正在使用 ProcessInputRow 方法,这就是我最初所做的,但如果列表的大小是 apiBatchSize 的偶数倍,则在最后一批中遇到问题。事实证明,EndOfRowset() 在该方法中从未设置为 True。不用担心,我们只需要使用ProcessInput 方法。在“正常”世界中,流程输入法会导致流程输入行处理一行,因此我们将跳过中间人,直接使用 ProcessInput 中的缓冲区。我很懒,没有将我的 Row 引用重命名为 Buffer,因为自动生成的代码最初处理了该参数。

这里的伪逻辑是

  • 虽然有数据行
    • 如果我们达到了批量大小,请将我们的数据收集发送出去进行处理
      • 对于每个已处理的行,将一行添加到输出缓冲区并用干净的数据填充它
    • 清空我们的收集桶(它已经发送到下游)
  • 将当前行添加到我们的集合桶中

C# 本身

using System;
using System.Data;
using System.Collections.Generic;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;

/// <summary>
/// There might be a clever way to re-use the metadata from the Input/OutputBuffer 
/// definition but  I don't know how to access it so I redefine it here
/// </summary>
public struct InData
{
    public string AddressLine1 { get; set; }
}

/// <summary>
/// There might be a clever way to re-use the metadata from the Input/OutputBuffer 
/// definition but  I don't know how to access it so I redefine it here
/// </summary>
public struct OutData
{
    public string AddressLine1Clean { get; set; }
    public string AddressCityClean { get; set; }
    public string AddressStateClean { get; set; }
    public string AddressPostalCodeClean { get; set; }
}

/// <summary>
/// This is the class to which to add your code.  Do not change the name, attributes, or parent
/// of this class.
/// </summary>
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
    List<InData> mData;
    int mBatchSize;

    /// <summary>
    /// This method is called once, before rows begin to be processed in the data flow.
    ///
    /// You can remove this method if you don't need to do anything here.
    /// </summary>
    public override void PreExecute()
    {
        base.PreExecute();

        this.mData = new List<InData>();
        this.mBatchSize = this.Variables.ApiBatchSize;
    }

    /// <summary>
    /// This method is called after all the rows have passed through this component.
    ///
    /// You can delete this method if you don't need to do anything here.
    /// </summary>
    public override void PostExecute()
    {
        base.PostExecute();

    }

    /// <summary>
    /// We're going to work with ProcessInput versus PorcessInputRow as it is
    /// "closer to the bare metal" and we need that
    /// </summary>
    /// <param name="Buffer"></param>
    public override void Input0_ProcessInput(Input0Buffer Row)
    {
        //base.Input0_ProcessInput(Buffer);

        while (Row.NextRow())
        {
            if (this.mData.Count >= this.mBatchSize)
            {
                foreach (var item in ApiCall())
                {
                    Output0Buffer.AddRow();
                    var inRow = item.Key;
                    var outRow = item.Value;

                    // fill columns with original data
                    Output0Buffer.AddressLine1 = inRow.AddressLine1;
                    // etc

                    // fill columns with clean data
                    Output0Buffer.AddressLine1Clean = outRow.AddressLine1Clean;
                    Output0Buffer.AddressCityClean = outRow.AddressCityClean;
                    Output0Buffer.AddressStateClean = outRow.AddressStateClean;
                    Output0Buffer.AddressPostalCodeClean = outRow.AddressPostalCodeClean;
                    // etc
                }

                // TODO Remove this for production, just ensuring batching is working as intended
                bool fireAgain = false;
                string status = "Batch released. Conditions => mDataCount := " + this.mData.Count;
                this.ComponentMetaData.FireInformation(0, "ApiProcessing", status, "", 0, ref fireAgain);

                // Reset for next iteration
                this.mData.Clear();
            }

            this.mData.Add(new InData() { AddressLine1 = Row.AddressLine1 });
        }

        // Handle the final possible partial batch
        if (this.mData.Count > 0)
        {
            foreach (var item in ApiCall())
            {
                Output0Buffer.AddRow();
                var inRow = item.Key;
                var outRow = item.Value;

                // fill columns with original data
                Output0Buffer.AddressLine1 = inRow.AddressLine1;
                // etc

                // fill columns with clean data
                Output0Buffer.AddressLine1Clean = outRow.AddressLine1Clean;
                Output0Buffer.AddressCityClean = outRow.AddressCityClean;
                Output0Buffer.AddressStateClean = outRow.AddressStateClean;
                Output0Buffer.AddressPostalCodeClean = outRow.AddressPostalCodeClean;
                // etc
            }

            // TODO Remove this for production, just ensuring batching is working as intended
            bool fireAgain = false;
            string status = "Final batch released. Conditions => mDataCount := " + this.mData.Count;
            this.ComponentMetaData.FireInformation(0, "ApiProcessing", status, "", 0, ref fireAgain);

            // Reset for next iteration
            this.mData.Clear();

        }
    }

    ///// <summary>
    ///// This method is called once for every row that passes through the component from Input0.
    ///// We need to preserve rows in our own memory allocation
    ///// We're not getting the EndOfRowset call in time to release the final
    ///// </summary>
    ///// <param name="Row">The row that is currently passing through the component</param>
    //public override void Input0_ProcessInputRow(Input0Buffer Row)
    //{
    //}

    public override void CreateNewOutputRows()
    {
        // I don't think we need to do anything special here
        // but I'm leaving it in in case you have some weird case
    }

    /// <summary>
    /// Simulate data cleaning
    /// </summary>
    /// <returns></returns>
    public Dictionary<InData, OutData> ApiCall()
    {
        int macGuffin = 0;
        Dictionary<InData, OutData> cleanData = new Dictionary<InData, OutData>();
        foreach (var item in this.mData)
        {
            cleanData.Add(item, new OutData() { AddressLine1Clean = "Clean" + item.AddressLine1, AddressCityClean = "Clean", AddressPostalCodeClean = "12345-1234", AddressStateClean = "CL"  });
            macGuffin = macGuffin % this.mBatchSize;
        }

        return cleanData;
    }

}

脚本组件的屏幕截图

这是我们为脚本组件提供 SSIS 级别变量的地方。我选择了 ApiBatchSize

在 Input Columns 选项卡中,我选择了所有需要通过的列,并将它们标记为 ReadOnly 使用类型。

在 Inputs and Outputs 选项卡中,我要做的第一件事是导航到 Output 0 并将 SynchronousInputID 从“Script Component.Inputs[Input 0]”之类的内容更改为 None

定义您需要的所有列。我复制了我的原始列(AddressLine1),然后添加了我的处理能够填充的所有新列(AddressLine1Clean,城市/州/邮政编码)。在 Output 0 下,选择 Output Columns 集合并反复按下“Add Column”并进行配置。除了提供名称之外,我在这里将所有数据类型更改为字符串 (DT_STR),因为这就是我正在使用的。默认为32位整数类型(DT_I4)

请注意,此屏幕截图中没有原始列,但您需要添加它才能使代码正常工作。

那里可能有更新的书籍,但是当我遇到脚本问题时,仍然参考了项目经理在引入 SSIS 时的这本绝版书籍。

Rational 脚本编写 SQL Server 2005 Integration Services Beta 预览指南(Rational 指南) 作者:唐纳德·法默,德里克·法默 平装本,192 页,2005 年出版 ISBN-10:1-932577-21-1 / 1932577211 ISBN-13:978-1-932577-21-1 / 9781932577211

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-12-25
    • 2012-09-17
    • 2016-12-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-03-12
    • 1970-01-01
    相关资源
    最近更新 更多