【发布时间】:2020-02-26 12:31:04
【问题描述】:
我正在使用 concurrentbag 来抓取 URL,现在它对于 500 / 100 个 url 工作正常,但是当我试图抓取 8000 个 url 时。所有 URL 都未处理,并且 inputQueue 中的某些项目处于待处理状态。
但我正在使用 while (!inputQueue.IsEmpty) 。因此,它应该循环运行,直到输入队列中存在任何项目。
我只想最多运行 100 个线程。所以,我首先创建 100 个线程并调用“Run()”方法,在该方法中我运行一个循环来获取项目,直到项目在输入队列中退出,并在抓取 url 后添加到输出队列中。
public ConcurrentBag<Data> inputQueue = new ConcurrentBag<Data>();
public ConcurrentBag<Data> outPutQueue = new ConcurrentBag<Data>();
public List<Data> Scrapes(List<Data> scrapeRequests)
{
ServicePointManager.ServerCertificateValidationCallback += (sender, cert, chain, sslPolicyErrors) => true;
string proxy_session_id = new Random().Next().ToString();
numberOfRequestSent = 0;
watch.Start();
foreach (var sRequest in scrapeRequests)
{
inputQueue.Add(sRequest);
}
//inputQueue.CompleteAdding();
var taskList = new List<Task>();
for (var i = 0; i < n_parallel_exit_nodes; i++) //create 100 threads only
{
taskList.Add(Task.Factory.StartNew(async () =>
{
await Run();
}, TaskCreationOptions.RunContinuationsAsynchronously));
}
Task.WaitAll(taskList.ToArray()); //Waiting
//print result
Console.WriteLine("Number Of URLs Found - {0}", scrapeRequests.Count);
Console.WriteLine("Number Of Request Sent - {0}", numberOfRequestSent);
Console.WriteLine("Input Queue - {0}", inputQueue.Count);
Console.WriteLine("OutPut Queue - {0}", outPutQueue.ToList().Count);
Console.WriteLine("Success - {0}", outPutQueue.ToList().Where(x=>x.IsProxySuccess==true).Count().ToString());
Console.WriteLine("Failed - {0}", outPutQueue.ToList().Where(x => x.IsProxySuccess == false).Count().ToString());
Console.WriteLine("Process Time In - {0}", watch.Elapsed);
return outPutQueue.ToList();
}
async Task<string> Run()
{
while (!inputQueue.IsEmpty)
{
var client = new Client(super_proxy_ip, "US");
if (!client.have_good_super_proxy())
client.switch_session_id();
if (client.n_req_for_exit_node == switch_ip_every_n_req)
client.switch_session_id();
var scrapeRequest = new ProductResearch_ProData();
inputQueue.TryTake(out scrapeRequest);
try
{
numberOfRequestSent++;
// Console.WriteLine("Sending request for - {0}", scrapeRequest.URL);
scrapeRequest.HTML = client.DownloadString((string)scrapeRequest.URL);
//Console.WriteLine("Response done for - {0}", scrapeRequest.URL);
scrapeRequest.IsProxySuccess = true;
outPutQueue.Add(scrapeRequest); //add object to output queue
//lumanti code
client.handle_response();
}
catch (WebException e)
{
Console.WriteLine("Failed");
scrapeRequest.IsProxySuccess = false;
Console.WriteLine(e.Message);
outPutQueue.Add(scrapeRequest); //add object to output queue
//lumanti code
client.handle_response(e);
}
client.clean_connection_pool();
client.Dispose();
}
return await Task.Run(() => "Done");
}
【问题讨论】:
-
为什么是
return await Task.Run(() => "Done");? -
@canton7 我想我不能返回void,所以,使用它,它会起作用吗?
-
它没有任何用处。您的
Run方法是同步的。只需使其非异步,并使其返回string。 (你可以让它异步,但是你必须使用异步DownloadString方法,然后等待它。你还必须使用Task.Run而不是Task.Factory.StartNew,因为Task.Factory.StartNew没有接受Func<Task>的重载:目前这是您的代码中的一个问题,但由于您的Run方法当前是同步的这一事实而被取消) -
@Deepak 对于初学者来说,100 个线程太多了。一般来说,您使用的线程数不应超过处理器内核数。
-
您应该能够使用异步 API 运行许多连接,而不会出现任何类型的 ThreadPool 膨胀。如果您的 ThreadPool 在处理网络内容时产生了额外的线程,那么这是一个巨大的暗示,表明您做错了什么。
标签: c# multithreading concurrency task-parallel-library