【问题标题】:efficient way to do multi threaded calls to sql server?对sql server进行多线程调用的有效方法?
【发布时间】:2010-12-16 19:34:05
【问题描述】:

我有一个 sql 存储过程,它将调用表中的前 1000 条记录,该表的功能类似于队列 - 在此表中将或多或少 30,000-40,000 条记录。调用 SP 大约需要 4 秒(有一个 xml 列),因此要完成调用大约需要 2 分钟。 我想使用多线程调用并将记录插入同步字典\列表。 以前有人这样做过吗?有什么有效的方法可以尽快结束通话? 谢谢...

【问题讨论】:

  • 在使用线程之前考虑优化查询。
  • @John - 如果你把它作为答案发布,我会投赞成票 - 一些最好的建议。
  • @Donnie:完成了,有一些详细说明。
  • 您打算从多个线程中提取还是插入,我无法从您的问题中弄清楚...
  • 我在 DBA 的帮助下优化了查询。他说,由于使用了 xml 类型的列,他无法优化它超过 ~4 秒。我打算从表中提取,直到标记为最后一条记录的记录。

标签: c# sql-server multithreading


【解决方案1】:

考虑在使用线程之前优化查询。

根据我的经验,当多线程初学者实现线程时,通常不会提高性能。更糟糕的是,它通常会引入难以调试的细微错误。

先优化查询,你可能会发现不需要线程。

即使你实现了它们,最终你也会让 SQL Server 做太多的工作,而线程化的请求将不得不等待。

【讨论】:

  • 请注意查询是在优化索引之后由 DBA 完成的。
【解决方案2】:

基本错误是希望从多个线程插入数据库,并使用连接、锁使服务器超载,并最终使其瘫痪。

如果您正在读取数据,如果您找到一个执行速度更快并一次获取尽可能多数据的查询,您会做得更好。

在我看来,您的问题似乎无法在其层面上解决 - 也许如果您详细说明您想要做什么,您会得到更好的建议。

编辑:

我曾经使用 SQL 作为队列 - 我只是记得 - 要出列,您必须使用第一个查询的结果来获取第二个查询的输入,因此线程是不可能的。或者,您必须在数据库中标记您的排队数据“完成”,并且您的 READ 将变为 UPDATE -> 导致锁定。

如果您正在阅读,并且想尽快做出反应,您可以使用 DataReader,然后读取所有数据,并将您的处理分块到线程中 - 读取 100 条记录,分叉一个线程并将其传递给它。 ..然后是下一个记录,依此类推。这样您就可以平衡您的资源使用。

【讨论】:

  • 我会详细说明一下。我有一个获取记录的表。每组约 40,000 个标有 GUID。我想将 1,000 条记录的部分中的 40,000 条记录(由于 xml 列,此记录的 SELECT 需要约 4 秒才能达到高 IO)到 Windows 服务并做一些工作。记录的获取必须尽可能快。
  • 那么您打算如何在第一个查询运行时获取 NEXT 1000 条记录?
  • 好的,我看到了你的另一个答案 - 无论如何,我会在通过 DataReader 读取一行(或几行)时读取所有记录并分派工作。如果您的查询没有什么特别之处,那么第一行将很快准备好,并且您将在 IO 允许的范围内尽快获得每一行。
  • 无论如何,您的查询都不应该持续 4 秒。测量第一条记录的查询运行时间,并测量获取所有记录所需的时间。
  • 但是对 ExecuteReader 的调用不会也需要 4 秒吗?我会尝试使用 DataReader 并测量结果。我读过选择包含 xml 列的表很慢,所以我拿了 DBA 的话,这个查询不能改进。
【解决方案3】:

尝试使用DataReader异步读取数据;获取可以唯一标识数据库中行的列。填充队列以保存返回的数据值(自定义对象)并运行工作线程以针对队列执行任务。

您必须决定应该实现多少个工作线程来执行任务,因为线程有自己的开销,如果没有正确实现可能是一场噩梦。 p>

【讨论】:

  • 你有这样的工作的工作线程实现示例吗?问题是数据库队列在进程开始之前可能是空的,所以我不想激活不必要的多线程。
【解决方案4】:

如果你真的需要,你可以启动 BGWorkers,它们单独连接到服务器并报告他们的进度。

我为一个精心设计的导出/导入应用程序做了同样的事情来移动大约 50GB 的数据(4GB deflatestream'ed),除了我只使用 BGWorker 连续完成工作,而不是同时完成工作,没有锁定 UI 线程。 .

【讨论】:

    【解决方案5】:

    不清楚您是选择最近添加的 1000 行,还是选择特定列中具有最高值的 1000 行,也不清楚您的行是否是可变的 - 即一行可能符合条件昨天排名前 1000 名,但随后得到更新,因此今天不再符合条件。但是,如果各个行不可变,则可以为 TOP1000 设置一个单独的表,当第 1001 行插入其中时,插入后触发器会将第 1001 行(但是您确定该行)移动到 HISTORY 表.这将使选择几乎是即时的:从 TOP1000 中选择 *。当您需要查询 TOP1000 和 HISTORY 时,您只需将这两个表与一个 UNION 组合起来,就好像它们是一个表一样。或者,您可以将插入和第 1001 行删除包装在事务中,而不是触发器。

    但是,如果行发生变异,则可以使用不同的蠕虫,并且可以进出前 1000 名。

    【讨论】:

    • 嗨,我正在更新 TOP 1000 记录 (RecordID) 的状态并将它们插入到我正在从中进行选择的临时表中。
    • 那么,您正在更新基表中的 1000 行,然后再次选择这 1000 行以将它们插入到临时表中,然后执行 select * from temptable?顺便说一句,除了唯一性之外,依赖主键并不是最佳实践。
    【解决方案6】:
    public struct BillingData
    {
        public int CustomerTrackID, CustomerID;
        public DateTime BillingDate;
    }
    
    public Queue<BillingData> customerQueue = new Queue<BillingData>();
    volatile static int ThreadProcessCount = 0;
    readonly static object threadprocesslock = new object();
    readonly static object queuelock = new object();
    readonly static object countlock = new object();
    AsyncCallback asyncCallback
    
    // Pulling the Data Aync from the Database
    private void StartProcess()
    {
      SqlCommand command = SQLHelper.GetCommand("GetRecordsByBillingTrackID");
      command.Connection = SQLHelper.GetConnection("Con");SQLHelper.DeriveParameters(command);
      command.Parameters["@TrackID"].Value = trackid;
      asyncCallback = new AsyncCallback(FetchData);
      command.BeginExecuteXmlReader(asyncCallback, command);
    }
    
    public void FetchData(IAsyncResult c1)
        {
            SqlCommand comm1 = (SqlCommand)c1.AsyncState;
            System.Xml.XmlReader xr = comm1.EndExecuteXmlReader(c1);
            xr.Read();
            string data = "";
            while (!xr.EOF)
            {
            data = xr.ReadOuterXml();
            XmlDocument dom = new XmlDocument();
            dom.LoadXml("<data>" + data + "</data>");
            BillingData billingData;
            billingData.CustomerTrackID = Convert.ToInt32(dom.FirstChild.ChildNodes[0].Attributes["CustomerTrackID"].Value);
            billingData.CustomerID = Convert.ToInt32(dom.FirstChild.ChildNodes[0].Attributes["CustomerID"].Value);
            billingData.BillingDate = Convert.ToDateTime(dom.FirstChild.ChildNodes[0].Attributes["BillingDate"].Value);
    lock (queuelock)
    {
       if (!customerQueue.Contains(billingData))
       {
         customerQueue.Enqueue(billingData);
       }
    }
       AssignThreadProcessToTheCustomer();
    
    
    }
    
    
    
    xr.Close();
    
    
    
    }
    
    
    
    // Assign the Threads based on the data pulled
    private void AssignThreadProcessToTheCustomer()
    {
    int TotalProcessThreads =  5;
    int TotalCustomersPerThread = 5;
    if (ThreadProcessCount < TotalProcessThreads)
    {
    int ThreadsNeeded = (customerQueue.Count % TotalCustomersPerThread == 0) ?    (customerQueue.Count / TotalCustomersPerThread) : (customerQueue.Count / TotalCustomersPerThread + 1);
    int count = 0;
     if (ThreadsNeeded > ThreadProcessCount)
    {
     count = ThreadsNeeded - ThreadProcessCount;
     if ((count + ThreadProcessCount) > TotalProcessThreads)
        count = TotalProcessThreads - ThreadProcessCount;
    }
    for (int i = 0; i < count; i++)
    {
    ThreadProcess objThreadProcess = new ThreadProcess(this);
    ThreadPool.QueueUserWorkItem(objThreadProcess.BillingEngineThreadPoolCallBack, count);
    lock (threadprocesslock)
    {
     ThreadProcessCount++;
    }
    }
    
    public void BillingEngineThreadPoolCallBack(object threadContext)
    {
    BillingData? billingData = null;
    while (true)
    {
       lock (queuelock)
       {
          billingData = ProcessCustomerQueue();
       }
     if (billingData != null)
    {
     StartBilling(billingData.Value);
    }
    else
     break;
    
    More....
    }
    

    【讨论】:

      猜你喜欢
      • 2014-03-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-07-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多