【发布时间】:2018-06-30 08:55:23
【问题描述】:
我正在开发一个网络爬虫,它访问一个页面并提取链接以查找特定域,如果它没有找到它,它会查看提取的链接并重复,直到它达到页面限制或找到页面。我发现自己很难想出合理的逻辑来让机器人在提取链接后继续对任务进行排队,因为任务正在快速完成并且没有足够的时间来提交新提取的链接。在关闭执行程序之前,我怎样才能实现爬虫等到它没有更多链接?我已经包含了我的多线程实现的基本概述。我将最大线程数设置为 3,并提交 example.com 10 次(种子域)
Spawn Thread 访问该站点并提取链接,然后将它们返回为字符串。我的问题是我需要能够获取这些结果,然后将它们放入队列中。但是到那时队列已经结束了。有什么建议么?
更新所以澄清一下,我的问题是当我提交种子并获得结果时,我无法让它继续搜索返回的种子。 除非我阻止并等待结果,然后手动添加它们。
更新 2 为了进一步澄清,我试图防止在
future.get上发生阻塞,以便我可以添加返回的结果,因为它们 来被安排为任务。
int MaxThreads = 3;
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(MaxThreads); // How many threads
List<Future<String>> resultList = new ArrayList<>();// Create results list
for (int i = 0; i < 10; i ++) {
SpawnThread task = new SpawnThread("example.com");// Create Tasks
Future<String> result = executor.submit(task);//Launch tasks
//System.out.println("Added " + CurrentNum + " to the que!");
resultList.add(result);//Store Task Result
}
for(Future<String> future : resultList) //Loop through results
{
String resultfinished;
try {
resultfinished = future.get();
System.out.println(resultfinished);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
executor.shutdown();
我认为我需要的是一个非阻塞队列来存储结果 重新添加到提供新域进行爬网的列表中,但是 我似乎无法让它工作。
BlockingQueue queue = new ArrayBlockingQueue(1024);
Executor executor = Executors.newFixedThreadPool(4);
CompletionService<List<String>> completionService =
new ExecutorCompletionService<List<String>>(executor);
List<String> pagesToVisit = new ArrayList<String>();
Set<String> pagesVisited = new HashSet<String>();
String SeedPage = "https://example.com/";
String currentURL = null;
boolean done = false;
while(!done) {
int listsize = pagesToVisit.size();
if(pagesToVisit.isEmpty())
{
currentURL = SeedPage;
pagesVisited.add(SeedPage);
listsize = pagesToVisit.size() + 1;
}
else
{
currentURL = nextUrl();
}
for(int k = 0; k < listsize; k ++)
{
completionService.submit(new Spider(currentURL,"IP","PORT" ) {
});
}
int received = 0;
boolean errors = false;
while(received < listsize && !errors)
{
Thread.sleep(1000);
Future<List<String>> resultFuture = completionService.take(); //blocks if none available
try
{
List<String> result = resultFuture.get();
pagesToVisit.addAll(result);
received ++;
}
catch(Exception e)
{
//log
e.printStackTrace();
errors = true;
}
}
}
【问题讨论】:
标签: java multithreading concurrency future executor