【问题标题】:Parallel request to scrape multiple pages of a website抓取网站多个页面的并行请求
【发布时间】:2017-09-18 17:10:25
【问题描述】:

我想抓取一个包含大量包含有趣数据的页面的网站,但由于源非常大,我想使用多线程并限制过载。 我使用Parallel.ForEach 来启动 10 个任务的每个块,并在主 for 循环中等待,直到启动的活动线程数降至阈值以下。为此,我使用了一个活动线程计数器,当使用WebClient 启动一个新线程时我会递增,并在WebClientDownloadStringCompleted 事件被触发时递减。

最初的问题是如何使用DownloadStringTaskAsync 而不是DownloadString 并等待在Parallel.ForEach 中启动的每个线程都已完成。这已通过解决方法解决: 主循环中的计数器(activeThreads)和Thread.Sleep

使用await DownloadStringTaskAsync 而不是DownloadString 是否应该通过在等待DownloadString 数据到达时释放线程来提高速度?

回到最初的问题,有没有办法使用 TPL 更优雅地做到这一点,而无需涉及计数器的解决方法?

private static volatile int activeThreads = 0;

public static void RecordData()
{
  var nbThreads = 10;
  var source = db.ListOfUrls; // Thousands urls
  var iterations = source.Length / groupSize; 
  for (int i = 0; i < iterations; i++)
  {
    var subList = source.Skip(groupSize* i).Take(groupSize);
    Parallel.ForEach(subList, (item) => RecordUri(item)); 
    //I want to wait here until process further data to avoid overload
    while (activeThreads > 30) Thread.Sleep(100);
  }
}

private static async Task RecordUri(Uri uri)
{
   using (WebClient wc = new WebClient())
   {
      Interlocked.Increment(ref activeThreads);
      wc.DownloadStringCompleted += (sender, e) => Interlocked.Decrement(ref iterationsCount);
      var jsonData = "";
      RootObject root;
      jsonData = await wc.DownloadStringTaskAsync(uri);
      var root = JsonConvert.DeserializeObject<RootObject>(jsonData);
      RecordData(root)
    }
}

【问题讨论】:

    标签: c# multithreading async-await webclient parallel.foreach


    【解决方案1】:

    如果您想要一个优雅的解决方案,您应该使用 Microsoft 的 Reactive Framework。很简单:

    var source = db.ListOfUrls; // Thousands urls
    
    var query =
        from uri in source.ToObservable()
        from jsonData in Observable.Using(
            () => new WebClient(),
            wc => Observable.FromAsync(() => wc.DownloadStringTaskAsync(uri)))
        select new { uri, json = JsonConvert.DeserializeObject<RootObject>(jsonData) };
    
    IDisposable subscription =
        query.Subscribe(x =>
        {
            /* Do something with x.uri && x.json */
        });
    

    这就是整个代码。它是很好的多线程,并且受到控制。

    只需 NuGet "System.Reactive" 即可获取位。

    【讨论】:

    • 我想要一些没有额外扩展的东西,但这很有趣。不知道为什么,但它不能按原样工作。它给了我一个空的json
    • @sofsntp - 当我测试它时,它对我来说效果很好。您是否在 subscribe 方法中检查了您获得了 URI,但没有检查到 JSON?也许尝试更改代码,以便在订阅内部之前它不会反序列化 JSON。
    • 好的,它有效,但这样做有什么意义?是什么让它优雅?
    • @sofsntp - 它成为异步多线程的单线方式,具有所有一次性清理,并且使用 LINQ。试一试,再优雅不过了。
    【解决方案2】:
    Parallel.ForEach
    

    将创建 ProcessorCount 任务来执行源 Enumerable 中每个项目的函数。它会注意没有太多任务,并等待所有项目和任务被执行。

    Task.WhenAll
    

    只等待给定的任务它不执行它们。您可以以适当的方式执行它们,而不是一次执行很多。

    但是您的代码中有一些错误。 RecordUri 函数将返回一个必须等​​待的任务,否则 ForEach 将创建越来越多的任务,因为该函数永远不会知道当前任务何时完成。还有一个问题是你在一个任务中创建一个任务,第一个任务什么都不做,然后等待第一个。

    您可能还想看看Parallel.ForEach 的重载 https://msdn.microsoft.com/en-us/library/dd782934(v=vs.110).aspx

    编辑

    是否使用 await DownloadStringTaskAsync 而不是 DownloadString 应该通过在等待 DownloadString 数据到达时释放线程来提高所有速度?

    没有。当任务正在等待外部资源时,它会进入暂停状态(未使用某些旧/脏迭代等待的 Windows api)。所以没有太大区别。 不同的是编译器在编译异步代码时会产生的开销。 DownloadStringTaskAsync 将创建一个包含长操作的任务。如果您使用 await 它,您会将自己附加到该任务(通过 ContinueWith)。因此,您只需创建一个任务来等待另一个任务。这是我在上面的文字中所说的开销。

    我的方法是:在 Parallel.ForEach 中使用 synchronous method。线程将由 PLinq 完成,您可以继续。

    记住“吻”

    【讨论】:

    • 谢谢,但我已阅读相关资源。为了更清楚,我编辑了我的问题。我的问题是使用 DownloadStringTaskAsync 意味着该方法变为异步。我应该在 parell.foreach() 中使用 .wait() 吗?但我已阅读以避免这种情况
    • @sofsntp 添加编辑
    • 是的,对可扩展性的担忧是绝对正确的。但这不是问题所在。你将来会扩大规模吗?他的回答的问题在于,当您使用 HttpClient 时,您会受到更多的 url 限制:stackoverflow.com/questions/21558109/… 如果您只是创建越来越多的请求而不受您的限制,这将对性能产生负面影响。
    • 我不打算扩大规模。我只是想了解最佳实践并了解其背后的运作方式。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-05-23
    • 1970-01-01
    • 2020-10-09
    • 1970-01-01
    • 1970-01-01
    • 2022-11-09
    • 2022-01-18
    相关资源
    最近更新 更多