我很高兴我没有尝试徒手写这个,因为我忘记了很多细节。
这里有几点值得注意:我的两个数据结构InData 和OutData,您需要配置它们以跟踪输入/输出缓冲区中的内容。正如 cmets 所说,可能有一种巧妙的方法可以克隆 Buffer 对象的属性,但我不知道如何去做。定义这些以匹配数据流中的数据类型,如果您像我一样懒惰,请使用相同的列名,您可以复制/粘贴成功。
ApiCall 是一个虚拟方法,它使用我们的缓存值来请求数据清理服务执行它的操作。它需要返回清理后的数据,以便我们可以将输入和输出合并到一个统一的行中。可能有更好的方法来做到这一点,但希望它足以让您的思维过程启动。
我创建了一个 SSIS 级别变量 @[User::ApiBatchSize],您可以将其初始化为 500。使用这种方法可以优化发送的批量大小,而无需更改核心代码。我在 PreExecute 方法中初始化了我们的本地成员变量,因为那是脚本组件的构造函数。
通常,在异步脚本组件中,您正在使用 ProcessInputRow 方法,这就是我最初所做的,但如果列表的大小是 apiBatchSize 的偶数倍,则在最后一批中遇到问题。事实证明,EndOfRowset() 在该方法中从未设置为 True。不用担心,我们只需要使用ProcessInput 方法。在“正常”世界中,流程输入法会导致流程输入行处理一行,因此我们将跳过中间人,直接使用 ProcessInput 中的缓冲区。我很懒,没有将我的 Row 引用重命名为 Buffer,因为自动生成的代码最初处理了该参数。
这里的伪逻辑是
- 虽然有数据行
-
- 如果我们达到了批量大小,请将我们的数据收集发送出去进行处理
-
-
- 对于每个已处理的行,将一行添加到输出缓冲区并用干净的数据填充它
-
- 将当前行添加到我们的集合桶中
C# 本身
using System;
using System.Data;
using System.Collections.Generic;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
/// <summary>
/// There might be a clever way to re-use the metadata from the Input/OutputBuffer
/// definition but I don't know how to access it so I redefine it here
/// </summary>
public struct InData
{
public string AddressLine1 { get; set; }
}
/// <summary>
/// There might be a clever way to re-use the metadata from the Input/OutputBuffer
/// definition but I don't know how to access it so I redefine it here
/// </summary>
public struct OutData
{
public string AddressLine1Clean { get; set; }
public string AddressCityClean { get; set; }
public string AddressStateClean { get; set; }
public string AddressPostalCodeClean { get; set; }
}
/// <summary>
/// This is the class to which to add your code. Do not change the name, attributes, or parent
/// of this class.
/// </summary>
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
List<InData> mData;
int mBatchSize;
/// <summary>
/// This method is called once, before rows begin to be processed in the data flow.
///
/// You can remove this method if you don't need to do anything here.
/// </summary>
public override void PreExecute()
{
base.PreExecute();
this.mData = new List<InData>();
this.mBatchSize = this.Variables.ApiBatchSize;
}
/// <summary>
/// This method is called after all the rows have passed through this component.
///
/// You can delete this method if you don't need to do anything here.
/// </summary>
public override void PostExecute()
{
base.PostExecute();
}
/// <summary>
/// We're going to work with ProcessInput versus PorcessInputRow as it is
/// "closer to the bare metal" and we need that
/// </summary>
/// <param name="Buffer"></param>
public override void Input0_ProcessInput(Input0Buffer Row)
{
//base.Input0_ProcessInput(Buffer);
while (Row.NextRow())
{
if (this.mData.Count >= this.mBatchSize)
{
foreach (var item in ApiCall())
{
Output0Buffer.AddRow();
var inRow = item.Key;
var outRow = item.Value;
// fill columns with original data
Output0Buffer.AddressLine1 = inRow.AddressLine1;
// etc
// fill columns with clean data
Output0Buffer.AddressLine1Clean = outRow.AddressLine1Clean;
Output0Buffer.AddressCityClean = outRow.AddressCityClean;
Output0Buffer.AddressStateClean = outRow.AddressStateClean;
Output0Buffer.AddressPostalCodeClean = outRow.AddressPostalCodeClean;
// etc
}
// TODO Remove this for production, just ensuring batching is working as intended
bool fireAgain = false;
string status = "Batch released. Conditions => mDataCount := " + this.mData.Count;
this.ComponentMetaData.FireInformation(0, "ApiProcessing", status, "", 0, ref fireAgain);
// Reset for next iteration
this.mData.Clear();
}
this.mData.Add(new InData() { AddressLine1 = Row.AddressLine1 });
}
// Handle the final possible partial batch
if (this.mData.Count > 0)
{
foreach (var item in ApiCall())
{
Output0Buffer.AddRow();
var inRow = item.Key;
var outRow = item.Value;
// fill columns with original data
Output0Buffer.AddressLine1 = inRow.AddressLine1;
// etc
// fill columns with clean data
Output0Buffer.AddressLine1Clean = outRow.AddressLine1Clean;
Output0Buffer.AddressCityClean = outRow.AddressCityClean;
Output0Buffer.AddressStateClean = outRow.AddressStateClean;
Output0Buffer.AddressPostalCodeClean = outRow.AddressPostalCodeClean;
// etc
}
// TODO Remove this for production, just ensuring batching is working as intended
bool fireAgain = false;
string status = "Final batch released. Conditions => mDataCount := " + this.mData.Count;
this.ComponentMetaData.FireInformation(0, "ApiProcessing", status, "", 0, ref fireAgain);
// Reset for next iteration
this.mData.Clear();
}
}
///// <summary>
///// This method is called once for every row that passes through the component from Input0.
///// We need to preserve rows in our own memory allocation
///// We're not getting the EndOfRowset call in time to release the final
///// </summary>
///// <param name="Row">The row that is currently passing through the component</param>
//public override void Input0_ProcessInputRow(Input0Buffer Row)
//{
//}
public override void CreateNewOutputRows()
{
// I don't think we need to do anything special here
// but I'm leaving it in in case you have some weird case
}
/// <summary>
/// Simulate data cleaning
/// </summary>
/// <returns></returns>
public Dictionary<InData, OutData> ApiCall()
{
int macGuffin = 0;
Dictionary<InData, OutData> cleanData = new Dictionary<InData, OutData>();
foreach (var item in this.mData)
{
cleanData.Add(item, new OutData() { AddressLine1Clean = "Clean" + item.AddressLine1, AddressCityClean = "Clean", AddressPostalCodeClean = "12345-1234", AddressStateClean = "CL" });
macGuffin = macGuffin % this.mBatchSize;
}
return cleanData;
}
}
脚本组件的屏幕截图
这是我们为脚本组件提供 SSIS 级别变量的地方。我选择了 ApiBatchSize
在 Input Columns 选项卡中,我选择了所有需要通过的列,并将它们标记为 ReadOnly 使用类型。
在 Inputs and Outputs 选项卡中,我要做的第一件事是导航到 Output 0 并将 SynchronousInputID 从“Script Component.Inputs[Input 0]”之类的内容更改为 None
定义您需要的所有列。我复制了我的原始列(AddressLine1),然后添加了我的处理能够填充的所有新列(AddressLine1Clean,城市/州/邮政编码)。在 Output 0 下,选择 Output Columns 集合并反复按下“Add Column”并进行配置。除了提供名称之外,我在这里将所有数据类型更改为字符串 (DT_STR),因为这就是我正在使用的。默认为32位整数类型(DT_I4)
请注意,此屏幕截图中没有原始列,但您需要添加它才能使代码正常工作。
那里可能有更新的书籍,但是当我遇到脚本问题时,我仍然参考了项目经理在引入 SSIS 时的这本绝版书籍。
Rational 脚本编写 SQL Server 2005 Integration Services Beta 预览指南(Rational 指南)
作者:唐纳德·法默,德里克·法默
平装本,192 页,2005 年出版
ISBN-10:1-932577-21-1 / 1932577211
ISBN-13:978-1-932577-21-1 / 9781932577211