【问题标题】:ConcurrentBag Skiping some items C#ConcurrentBag 跳过一些项目 C#
【发布时间】:2020-02-26 12:31:04
【问题描述】:

我正在使用 concurrentbag 来抓取 URL,现在它对于 500 / 100 个 url 工作正常,但是当我试图抓取 8000 个 url 时。所有 URL 都未处理,并且 inputQueue 中的某些项目处于待处理状态。

但我正在使用 while (!inputQueue.IsEmpty) 。因此,它应该循环运行,直到输入队列中存在任何项目。

我只想最多运行 100 个线程。所以,我首先创建 100 个线程并调用“Run()”方法,在该方法中我运行一个循环来获取项目,直到项目在输入队列中退出,并在抓取 url 后添加到输出队列中。

public ConcurrentBag<Data> inputQueue = new ConcurrentBag<Data>();
    public ConcurrentBag<Data> outPutQueue = new ConcurrentBag<Data>();

    public List<Data> Scrapes(List<Data> scrapeRequests)
    {
        ServicePointManager.ServerCertificateValidationCallback += (sender, cert, chain, sslPolicyErrors) => true;
        string proxy_session_id = new Random().Next().ToString();

        numberOfRequestSent = 0;

        watch.Start();

        foreach (var sRequest in scrapeRequests)
        {
            inputQueue.Add(sRequest);
        }
        //inputQueue.CompleteAdding();

        var taskList = new List<Task>();
        for (var i = 0; i < n_parallel_exit_nodes; i++) //create 100 threads only
        {
            taskList.Add(Task.Factory.StartNew(async () =>
            {
               await Run();
            }, TaskCreationOptions.RunContinuationsAsynchronously));
        }

        Task.WaitAll(taskList.ToArray());   //Waiting

        //print result
        Console.WriteLine("Number Of URLs Found - {0}", scrapeRequests.Count);
        Console.WriteLine("Number Of Request Sent - {0}", numberOfRequestSent);

        Console.WriteLine("Input Queue - {0}", inputQueue.Count);

        Console.WriteLine("OutPut Queue - {0}", outPutQueue.ToList().Count);
        Console.WriteLine("Success - {0}", outPutQueue.ToList().Where(x=>x.IsProxySuccess==true).Count().ToString());
        Console.WriteLine("Failed - {0}", outPutQueue.ToList().Where(x => x.IsProxySuccess == false).Count().ToString());
        Console.WriteLine("Process Time In - {0}", watch.Elapsed);

        return outPutQueue.ToList();
    }


    async Task<string> Run()
    {
        while (!inputQueue.IsEmpty)
        {
            var client = new Client(super_proxy_ip, "US");

            if (!client.have_good_super_proxy())
                client.switch_session_id();
            if (client.n_req_for_exit_node == switch_ip_every_n_req)
                client.switch_session_id();

            var scrapeRequest = new ProductResearch_ProData();
            inputQueue.TryTake(out scrapeRequest);

            try
            {
                numberOfRequestSent++;

                // Console.WriteLine("Sending request for - {0}", scrapeRequest.URL);
                scrapeRequest.HTML = client.DownloadString((string)scrapeRequest.URL);
                //Console.WriteLine("Response done for - {0}", scrapeRequest.URL);

                scrapeRequest.IsProxySuccess = true;

                outPutQueue.Add(scrapeRequest); //add object to output queue

                //lumanti code
                client.handle_response();
            }
            catch (WebException e)
            {
                Console.WriteLine("Failed");

                scrapeRequest.IsProxySuccess = false;
                Console.WriteLine(e.Message);
                outPutQueue.Add(scrapeRequest); //add object to output queue

                //lumanti code
                client.handle_response(e);
            }

            client.clean_connection_pool();
            client.Dispose();
        }

        return await Task.Run(() => "Done");
    }

【问题讨论】:

  • 为什么是return await Task.Run(() =&gt; "Done");
  • @canton7 我想我不能返回void,所以,使用它,它会起作用吗?
  • 它没有任何用处。您的 Run 方法是同步的。只需使其非异步,并使其返回string。 (你可以让它异步,但是你必须使用异步 DownloadString 方法,然后等待它。你还必须使用 Task.Run 而不是 Task.Factory.StartNew,因为 Task.Factory.StartNew 没有接受Func&lt;Task&gt; 的重载:目前这是您的代码中的一个问题,但由于您的Run 方法当前是同步的这一事实而被取消)
  • @Deepak 对于初学者来说,100 个线程太多了。一般来说,您使用的线程数不应超过处理器内核数。
  • 您应该能够使用异步 API 运行许多连接,而不会出现任何类型的 ThreadPool 膨胀。如果您的 ThreadPool 在处理网络内容时产生了额外的线程,那么这是一个巨大的暗示,表明您做错了什么。

标签: c# multithreading concurrency task-parallel-library


【解决方案1】:

这里有多个问题,但似乎没有一个是导致inputQueue.Count 末尾有非零值的原因。无论如何,我想指出我能看到的问题。

var taskList = new List<Task>();
for (var i = 0; i < n_parallel_exit_nodes; i++) // create 100 threads only
{
    taskList.Add(Task.Factory.StartNew(async () =>
    {
        await Run();
    }, TaskCreationOptions.RunContinuationsAsynchronously));
}

Task.Factory.StartNew 方法不理解异步委托,因此当使用异步 lambda 作为参数调用它时,它会返回一个嵌套任务。在这种情况下,它返回一个Task&lt;Task&lt;string&gt;&gt;。您将此嵌套任务存储在List&lt;Task&gt; 集合中,这是可能的,因为Task&lt;TResult&gt; 类型继承自Task 类型,但这样做您将失去等待内部任务完成(并获得结果)的能力.您只持有对外部任务的引用。奇迹般地,在这种情况下这不是问题(通常是),因为外部任务完成了所有工作,而内部任务基本上什么都不做(除了使用线程池线程返回不是真的"Done" 字符串任何地方都需要)。

您也没有将任何延续附加到外部任务,因此标志TaskCreationOptions.RunContinuationsAsynchronously 似乎是多余的。

// create 100 threads only

您不会创建 100 个线程,而是创建 100 个任务。这些任务被安排在ThreadPool 中,由于任务长时间运行,它们将立即被饿死,并且将开始每 500 毫秒注入一个新线程,直到所有计划任务都分配给一个线程。

var scrapeRequest = new ProductResearch_ProData();
inputQueue.TryTake(out scrapeRequest);

在这里,您实例化了一个ProductResearch_ProData 类型的对象,该对象立即被丢弃,并在下一行有资格进行垃圾回收。 TryTake 方法将返回从包中删除的对象,如果包为空,则返回 null。你忽略了TryTake方法的返回值,它完全有可能是false,因为同时这个包可能已经被另一个工人清空了,然后继续一个可能有空值的scrapeRequest,导致那个案例到NullReferenceException

值得注意的是,您从ConcurrentBag&lt;Data&gt; 中提取了ProductResearch_ProData 类型的对象,因此Data 类继承自 基类ProductResearch_ProData,或者代码中存在转录错误。

【讨论】:

    猜你喜欢
    • 2016-12-24
    • 2015-03-18
    • 1970-01-01
    • 2022-07-07
    • 2012-04-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-09-04
    相关资源
    最近更新 更多