【问题标题】:SSIS Script Transformation - Performing WebClient Async OperationSSIS 脚本转换 - 执行 WebClient 异步操作
【发布时间】:2014-04-17 03:42:53
【问题描述】:

我收到了 6 个不同域/主机的唯一 URL,解析它们返回的 XML,并插入到表中。我的工作正常,除了它同步地执行每个服务器(一次一个)。我需要它同时访问所有六台服务器。

我将 WebClient 调用更改为使用 Async 方法,然后添加了一个 while 语句来检查是否所有 WebClient 调用都已返回,但是 SSIS 包似乎在 while 语句完成后被取消,无论我是否调用了 SetEndOfRowSet 或不是。 SSIS 日志中不显示任何错误(即使选择了所有日志选项)。它在输出窗口中显示“已取消”,当它发生时会弹出一个简短的控制台窗口,但我无法捕捉它。

如果我在 SSIS 包中没有 while 语句,则包会继续经过我的脚本转换,并且不会从中输出任何行。 SynchronousInputID 上的脚本转换设置为无。

这是 SSIS C# 脚本转换代码:

/* Microsoft SQL Server Integration Services Script Component
*  Write scripts using Microsoft Visual C# 2008.
*  ScriptMain is the entry point class of the script.*/

using System;
using System.Data;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using System.Net;
using System.Diagnostics;
using System.Xml;

[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent

{
    private int CountComplete = 0;

public override void PreExecute()
{
    base.PreExecute();
}

public override void PostExecute()
{
    base.PostExecute();
}

public override void Input0_ProcessInput(Input0Buffer Buffer)
{
    while(Buffer.NextRow())
    {
        Input0_ProcessInputRow(Buffer);
    }

    if (Buffer.EndOfRowset())
    {
        while (CountComplete < Variables.intRows)
        {
            System.Threading.Thread.Sleep(500);
        }
        Output0Buffer.SetEndOfRowset();
    }
}

public override void Input0_ProcessInputRow(Input0Buffer Row)
{
    string DNS_NA = Row.DNSNA;
    long SERVER = Convert.ToInt64(Row.SERSYSNR);

    // Ignore server/certificate mismatch...
    ServicePointManager.ServerCertificateValidationCallback += (sender, certificate, chain, sslPolicyErrors) => true;

    // Declare variable that will hold the xml document received by the API
    string xmlDoc = String.Empty;

    // Get the URI from the variable
    string url = @"https://" + DNS_NA + Variables.strURLSuffix;

    // Time the request...
    Stopwatch timer = Stopwatch.StartNew();
    //Create a Web Client
    WebClient client = new WebClient();
    client.DownloadStringCompleted += (sender, e) =>
    {
        timer.Stop();
        processRow(e.Result, SERVER, Convert.ToInt32(timer.ElapsedMilliseconds), Convert.ToInt32(Row.UPTIMQY), Output0Buffer);
        CountComplete += 1;
    };
    Uri uri = new Uri(url);
    client.DownloadStringAsync(uri);
}

private void processRow(string xmlDoc, long SERVER, int FRAPI_TIME, int PRE_UPTIME, Output0Buffer buffer)
{
    try
    {
        XmlDocument xDoc = new XmlDocument();

        xDoc.LoadXml(xmlDoc);

        bool NORESPONSE = false; // If we made it this far, we got a response, right?
        short CPU = Convert.ToInt16(xDoc.SelectSingleNode("//server").ChildNodes[1].InnerText);
        int TOT_MEM = Convert.ToInt32(xDoc.SelectSingleNode("//server").ChildNodes[2].InnerText) / (1024 * 1024);
        int USE_MEM = Convert.ToInt32(xDoc.SelectSingleNode("//server").ChildNodes[3].InnerText) / (1024 * 1024);
        int UPTIME = Convert.ToInt32(xDoc.SelectSingleNode("//server").ChildNodes[4].InnerText) / 1000 / 60;
        short REQUESTS = Convert.ToInt16(xDoc.SelectSingleNode("//server").ChildNodes[5].InnerText);
        bool RESET = (UPTIME < PRE_UPTIME) ? true : false;


        foreach (XmlNode xNode in xDoc.SelectNodes("//server/detail/request"))
        {
            buffer.AddRow();
            buffer.SERSYSNR = SERVER;
            buffer.CPUUSEQY = CPU;
            buffer.TOTMEMQY = TOT_MEM;
            buffer.USEMEMQY = USE_MEM;
            buffer.UPTIMQY = UPTIME;
            buffer.REQCNTQY = REQUESTS;
            buffer.REQMSQY = FRAPI_TIME;
            buffer.SERNONRSPIR = NORESPONSE;
            buffer.SERRSTIR = RESET;
            buffer.REQSYSNR = Convert.ToInt32(xNode.ChildNodes[0].InnerText);
            buffer.REQIPTE = xNode.ChildNodes[1].InnerText;
            buffer.REQURLTE = xNode.ChildNodes[2].InnerText;
            buffer.REQRUNTMQY = Convert.ToInt32(xNode.ChildNodes[3].InnerText);

        }
    }
    catch
    {
        // Hopefully we didn't throw an exception inside the foreach where a row has already been added...
        buffer.AddRow();
        buffer.SERSYSNR = SERVER;
        buffer.SERNONRSPIR = true;
    }

}

   // public override void CreateNewOutputRows()
   // {
        /*
          Add rows by calling the AddRow method on the member variable named "<Output Name>Buffer".
          For example, call MyOutputBuffer.AddRow() if your output was named "MyOutput".
        */
    //}

}

我可以在 DNS_NA 上使用条件拆分并制作 x 个脚本组件,然后将它们全部合并,但每次将新服务器添加到列表中时,我都必须编辑 SSIS 包。不理想。

【问题讨论】:

    标签: c# sql sql-server ssis


    【解决方案1】:

    相关连接错误报告:https://connect.microsoft.com/SQLServer/feedback/details/367692/ssis-detects-end-of-data-flow-components-in-the-wrong-way

    拆分成 6 个脚本任务是唯一可行的解​​决方案。通过切换到线程设置并仅调用 DownloadString,我在使用 DownloadStringSync 时确实修复了 SSIS 崩溃。我必须确保在处理结果时锁定了 Output0Buffer,这让我有点难过。在我检查了 ProcessInputBuffer 中的 Buffer.EndOfRowSet() 之后,我最终使用了一个列表来存储线程并循环它们并调用 Thread.Join()。

    但是,该解决方案在所有请求完成之前仍不会转发行,这意味着如果一台服务器运行很长时间,则在所有请求都返回之前不会将结果放入数据库。

    【讨论】:

      猜你喜欢
      • 2011-03-20
      • 2019-05-27
      • 1970-01-01
      • 1970-01-01
      • 2015-04-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-06-23
      相关资源
      最近更新 更多