【问题标题】:Submit new task to executor after worker finishes工人完成后向执行者提交新任务
【发布时间】:2018-06-30 08:55:23
【问题描述】:

我正在开发一个网络爬虫,它访问一个页面并提取链接以查找特定域,如果它没有找到它,它会查看提取的链接并重复,直到它达到页面限制或找到页面。我发现自己很难想出合理的逻辑来让机器人在提取链接后继续对任务进行排队,因为任务正在快速完成并且没有足够的时间来提交新提取的链接。在关闭执行程序之前,我怎样才能实现爬虫等到它没有更多链接?我已经包含了我的多线程实现的基本概述。我将最大线程数设置为 3,并提交 example.com 10 次(种子域)

Spawn Thread 访问该站点并提取链接,然后将它们返回为字符串。我的问题是我需要能够获取这些结果,然后将它们放入队列中。但是到那时队列已经结束了。有什么建议么?

更新所以澄清一下,我的问题是当我提交种子并获得结果时,我无法让它继续搜索返回的种子。 除非我阻止并等待结果,然后手动添加它们。

更新 2 为了进一步澄清,我试图防止在 future.get 上发生阻塞,以便我可以添加返回的结果,因为它们 来被安排为任务。

            int MaxThreads = 3;
            ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(MaxThreads); // How many threads
            List<Future<String>> resultList = new ArrayList<>();// Create results list

            for (int i = 0; i < 10; i ++) {
                SpawnThread task  = new SpawnThread("example.com");// Create Tasks
                Future<String> result = executor.submit(task);//Launch tasks
                //System.out.println("Added " + CurrentNum + " to the que!");
                resultList.add(result);//Store Task Result
            }

             for(Future<String> future : resultList) //Loop through results
                {
                    String resultfinished;
                    try {
                        resultfinished = future.get();
                        System.out.println(resultfinished);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }

                }
            executor.shutdown();

我认为我需要的是一个非阻塞队列来存储结果 重新添加到提供新域进行爬网的列表中,但是 我似乎无法让它工作。

BlockingQueue queue = new ArrayBlockingQueue(1024);
        Executor executor = Executors.newFixedThreadPool(4);
        CompletionService<List<String>> completionService = 
                 new ExecutorCompletionService<List<String>>(executor);
        List<String> pagesToVisit = new ArrayList<String>();
        Set<String> pagesVisited = new HashSet<String>();

        String SeedPage = "https://example.com/";
        String currentURL = null;

        boolean done = false;
        while(!done) {

             int listsize = pagesToVisit.size();
             if(pagesToVisit.isEmpty())
             {
                 currentURL = SeedPage;
                 pagesVisited.add(SeedPage);
                 listsize = pagesToVisit.size() + 1;
              }
             else
             {
                 currentURL = nextUrl();
             }


             for(int k = 0; k < listsize; k ++)
             {

                 completionService.submit(new Spider(currentURL,"IP","PORT" ) {
                 });
             }

              int received = 0;
              boolean errors = false;
              while(received < listsize  && !errors)
              {
                  Thread.sleep(1000);
                  Future<List<String>> resultFuture = completionService.take(); //blocks if none available
                  try
                  {
                      List<String> result = resultFuture.get();
                      pagesToVisit.addAll(result);
                      received ++; 
                  }
                  catch(Exception e)
                  {
                               //log
                            e.printStackTrace();
                            errors = true;
                  }
              }

          }

【问题讨论】:

    标签: java multithreading concurrency future executor


    【解决方案1】:

    我不确定我的问题是否正确,但是

    你可以使用awaitTermination();方法

    公共布尔等待终止(长时间超时, 时间单位) 抛出 InterruptedException

    在关机后阻塞直到所有任务都完成执行 请求,或者发生超时,或者当前线程被中断, 以先发生者为准。

    参数:timeout - 最大等待时间 unit - 时间单位 超时参数

    返回:如果此执行程序终止,则返回 true;如果超时,则返回 false 终止前已过

    抛出:InterruptedException - 如果在等待时被中断

    例如

    try{
    executor.awaitTermination(5, TimeUnit.Seconds);
    }catch(InterruptedException e)
    {
    // Catch block
    }
    

    shutdown() 方法不等待线程完成

    启动有序关闭,其中执行先前提交的任务,但不会接受新任务。如果已经关闭,调用没有额外的效果。 该方法不等待之前提交的任务完成执行。

    【讨论】:

    • 那么我可以提交种子,然后提交返回的链接并执行 x 次吗?阻止不会阻止提交新任务吗?
    • 使用shutdown后你无法生成新线程,执行一次awaitTermination()就足够了,因为它在它执行的整个线程池上执行了它的操作
    • 不确定这是我要找的。我正在寻找一种方法来提交 1 个任务等待结果是 List(所有提取的链接)。从那里我需要重新提交列表中的每个字符串作为一项任务,以查找将作为 List 返回的更多链接。此过程将继续进行,直到达到 MaxPage 计数或用完链接以继续提取新链接。我让它访问种子页面以获取 List 但它没有看到它需要爬过的列表中的新项目并因此停止。
    • 那么我想你会在 ExecutorService 以外的地方寻找其他地方,尝试使用 Thread 类及其 join 方法我会说
    猜你喜欢
    • 2019-04-29
    • 2017-01-18
    • 1970-01-01
    • 2019-01-23
    • 2018-03-25
    • 1970-01-01
    • 2012-09-11
    • 2015-04-29
    • 1970-01-01
    相关资源
    最近更新 更多