【问题标题】:Make an existing code in Java parallel/multithread在 Java 并行/多线程中创建现有代码
【发布时间】:2011-05-14 22:17:13
【问题描述】:

我有一个非常简单的爬虫。我想让我当前的代码在几个线程中运行。您能否提供一些小教程或文章来帮助我完成这项测试?

我最初是一名 .Net 开发人员,在 .Net 中,我在多线程中运行代码没有任何问题,但不幸的是,我对 Java 中的线程一无所知。

我的爬虫是一个命令行软件,所以不用担心 GUI。

提前谢谢你。

【问题讨论】:

  • 你熟悉锁的概念吗? (我不知道.net)
  • 您的代码是否使用某种队列来维护所有后续要抓取的位置?
  • @Sword22:是的。 @Dilum:不,因为它是一个通用爬虫,它使用一个实现接口的对象,该接口具有一个 .next() 函数,该函数为爬虫返回一个特殊的文档。

标签: java multithreading thread-safety


【解决方案1】:

Java 通过 Thread 类执行多线程。使现有代码成为多线程的最常见方法之一是使用 Runnable 接口来定义您要在线程启动时调用的内容,然后将其启动。

public class SomeFunctions
{
  public static void FunctionA() {}
  public static void FunctionB() {}
  public static void FunctionC() {}
}

// ...
Thread t1 = new Thread(new Runnable() {
   public void run() {
      SomeFunctions.FunctionA();
   }
});
t1.start();

// (rinse and repeat for the other functions)

干编码,但它至少应该能够理解一般概念。当然,一旦你进入多线程领域,你就会遇到并发问题,需要确保一切都适当地同步等等,但是任何语言都会有这些问题。

如果您担心同步问题,可以使用一些工具。最简单的是 Java 中内置的递归互斥锁功能,即“同步”关键字。更经典的方法也可以通过 java.util.concurrent 和 java.util.concurrent.locks 包中的各种类,例如 Semaphore 和 ReadWriteLock

http://download.oracle.com/javase/6/docs/api/java/util/concurrent/package-summary.html http://download.oracle.com/javase/6/docs/api/java/util/concurrent/locks/package-summary.html

【讨论】:

  • +1 @Shirik 非常好的建议和答案。同步是您应该重点关注的事情,您肯定必须面对。
  • 是的,很好的答案。我还发现这个教程对我有很大帮助。 Multithreading in Java
【解决方案2】:

您可以查看我的网络爬虫示例。抱歉,冗长。

import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
 * A web crawler with a Worker pool
 * 
 * @author Adriaan
 */
public class WebCrawler implements Manager {

        private Set<Worker> workers = new HashSet<Worker>();
        private List<String> toCrawl = new ArrayList<String>();
        private Set<String> crawled = new HashSet<String>();
        private Set<String> hosts = new HashSet<String>();
        private Set<String> results = new HashSet<String>();
        private int maxResults;

        public WebCrawler(String url, int numberOfWorkers, int maxResults) {
                this.maxResults = maxResults;
                toCrawl.add(url);
                createWorkers(numberOfWorkers);
        }

        public void createWorkers(int numberOfWorkers) {
                for (int i = 0; i < numberOfWorkers; i++) {
                        workers.add(new Worker(this));
                }
        }

        private void stopWorkers() {
                for (Worker worker : workers) {
                        worker.terminate();
                }
        }

        public synchronized Job getNewJob() {
                while (toCrawl.size() == 0) {
                        try {
                                wait();
                        } catch (InterruptedException e) {
                                // ignore
                        }
                }
                return new EmailAddressCrawlJob().setDescription(toCrawl.remove(0));
        }

        public synchronized void jobCompleted(Job job) {
                // System.out.println("crawled: " + job.getDescription());
                crawled.add(job.getDescription());
                String host = getHost(job.getDescription());
                boolean knownHost = hosts.contains(host);
                if (!knownHost) {
                        System.out.println("host: " + host);
                        hosts.add(host);
                }
                for (String url : job.getNewDescriptions()) {
                        if (!crawled.contains(url)) {
                                if (knownHost) {
                                        toCrawl.add(toCrawl.size() - 1, url);
                                } else {
                                        toCrawl.add(url);
                                }
                        }
                }
                for (String result : job.getResults()) {
                        if (results.add(result)) {
                                System.out.println("result: " + result);
                        }
                }
                notifyAll();
                if (results.size() >= maxResults) {
                        stopWorkers();
                        System.out.println("Crawled hosts:");
                        for (String crawledHost : hosts) {
                                System.out.println(crawledHost);
                        }
                        Set<String> uncrawledHosts = new HashSet<String>();
                        for (String toCrawlUrl : toCrawl) {
                                uncrawledHosts.add(getHost(toCrawlUrl));
                        }
                        System.out.println("Uncrawled hosts:");
                        for (String unCrawledHost : uncrawledHosts) {
                                System.out.println(unCrawledHost);
                        }
                }
                if (crawled.size() % 10 == 0) {
                        System.out.println("crawled=" + crawled.size() + " toCrawl="
                                        + toCrawl.size() + " results=" + results.size() + " hosts="
                                        + hosts.size() + " lastHost=" + host);
                }
        }

        public String getHost(String host) {
                int hostStart = host.indexOf("://") + 3;
                if (hostStart > 0) {
                        int hostEnd = host.indexOf("/", hostStart);
                        if (hostEnd < 0) {
                                hostEnd = host.length();
                        }
                        host = host.substring(hostStart, hostEnd);
                }
                return host;
        }

        public static void main(String[] args) throws MalformedURLException {
                new WebCrawler("http://www.nu.nl/", 5, 20);
        }
}

工人

**
 * A Worker proactively gets a Job, executes it and notifies its manager that
 * the Job is completed.
 * 
 * @author Adriaan
 */
public class Worker extends Thread {

        private final Manager manager;
        private Job job = null;
        private boolean isWorking;

        public Worker(Manager manager) {
                this.manager = manager;
                isWorking = true;
                start();
        }

        @Override
        public void run() {
                System.out.println("Worker " + Thread.currentThread().getId()
                                + " starting ");
                while (isWorking) {
                        job = manager.getNewJob();
                        job.execute();
                        manager.jobCompleted(job);
                }
        }

        public void terminate() {
                isWorking = false;
        }
}

管理界面

/**
 * Manager interface for Workers
 * 
 * @author Adriaan
 */
public interface Manager {

        /**
         * Gets a new job
         * 
         * @return
         */
        public Job getNewJob();

        /**
         * Indicates the job is completed
         * 
         * @param job
         */
        public void jobCompleted(Job job);
}

工作

import java.util.HashSet;
import java.util.Set;

/**
 * A Job is a unit of work defined by a String (the description). During execution the 
 * job can obtain results and new job descriptions.
 *
 * @author Adriaan
 */
public abstract class Job {

        private String description;
        private Set<String> results = new HashSet<String>();
        private Set<String> newDescriptions = new HashSet<String>();

        /**
         * Sets the job description
         * 
         * @param description
         * @return this for chaining
         */
        public Job setDescription(String description) {
                this.description = description;
                return this;
        }

        /**
         * Executes the job
         */
        public abstract void execute();

        /**
         * Gets the results obtained
         * 
         * @return
         */
        public Set<String> getResults() {
                return results;
        }

        /**
         * Gets the now job descriptions obtained
         * 
         * @return
         */
        public Set<String> getNewDescriptions() {
                return newDescriptions;
        }

        /**
         * Gets the job description
         * 
         * @return
         */
        public String getDescription() {
                return description;
        }

        /**
         * Allows the implementation to add an obtained result
         * 
         * @param result
         */
        void addResult(String result) {
                results.add(result);
        }

        /**
         * Allows the implementation to add an obtained description
         * 
         * @param result
         */
        void addNewDescription(String newDescription) {
                newDescriptions.add(newDescription);
        }
}

爬取电子邮件地址页面的作业:

import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * A Job which crawls HTTP or HTTPS URL's for email adresses, collecting new
 * URL's to crawl along the way.
 * 
 * @author Adriaan
 */
public class EmailAddressCrawlJob extends Job {

        @Override
        public void execute() {
                try {
                        URL url = new URL(getDescription());
                        if (url != null) {
                                String text = readText(url);
                                extractNewDescriptions(text, url);
                                extractResults(text);
                        }
                } catch (MalformedURLException e) {
                        System.err.println("Bad url " + getDescription());
                }
        }

        private String readText(URL url) {
                URLConnection connection;
                try {
                        connection = url.openConnection();
                        InputStream input = connection.getInputStream();
                        byte[] buffer = new byte[1000];
                        int num = input.read(buffer);
                        if (num > 0) {
                                StringBuilder builder = new StringBuilder();
                                builder.append(new String(buffer, 0, num));
                                while (num != -1) {
                                        num = input.read(buffer);
                                        if (num != -1) {
                                                builder.append(new String(buffer, 0, num));
                                        }
                                }
                                return builder.toString();
                        }
                } catch (IOException e) {
                        //System.err.println("Could not read from " + url);
                }
                return "";
        }

        private void extractNewDescriptions(String text, URL url) {

                // URL extracting code from Sun example
                String lowerCaseContent = text.toLowerCase();
                int index = 0;
                while ((index = lowerCaseContent.indexOf("<a", index)) != -1) {

                        if ((index = lowerCaseContent.indexOf("href", index)) == -1) {
                                break;
                        }

                        if ((index = lowerCaseContent.indexOf("=", index)) == -1) {
                                break;
                        }

                        index++;
                        String remaining = text.substring(index);
                        StringTokenizer st = new StringTokenizer(remaining, "\t\n\r\">#");
                        String strLink = st.nextToken();

                        if (strLink.startsWith("javascript:")) {
                                continue;
                        }

                        URL urlLink;
                        try {
                                urlLink = new URL(url, strLink);
                                strLink = urlLink.toString();
                        } catch (MalformedURLException e) {
                                // System.err.println("Could not create url: " + target
                                // + " + " + strLink);
                                continue;
                        }
                        // only look at http links
                        String protocol = urlLink.getProtocol();
                        if (protocol.compareTo("http") != 0
                                        && protocol.compareTo("https") != 0) {
                                // System.err.println("Ignoring: " + protocol
                                // + " protocol in " + urlLink);
                                continue;
                        }
                        addNewDescription(urlLink.toString());
                }
        }

        private void extractResults(String text) {
                Pattern p = Pattern
                                .compile("([\\w\\-]([\\.\\w])+[\\w]+@([\\w\\-]+\\.)+[A-Za-z]{2,4})");
                Matcher m = p.matcher(text);
                while (m.find()) {
                        addResult(m.group(1));
                }
        }
}

我知道这个答案有点冗长,但我认为 OP 可能最好用一个工作示例来帮助我,不久前我碰巧做了一个。

【讨论】:

    【解决方案3】:

    一个非常基本的java程序,它将给出多线程的抽象概念..

    public class MyThread extends Thread {
    
      String word;
    
      public MyThread(String rm){
        word = rm;
      }
    
      public void run(){
    
        try {
    
          for(;;){
            System.out.println(word);
            Thread.sleep(1000);
          }
    
        } catch(InterruptedException e) {
    
          System.out.println("sleep interrupted");      
        }
      }
    
      public static void main(String[] args) {
    
        Thread t1=new MyThread("First Thread");
        Thread t2=new MyThread("Second Thread");
        t1.start();
        t2.start();
      }
    } 
    

    输出将是..

    First Thread
    Second Thread
    First Thread
    Second Thread
    First Thread
    

    看这个PPT,它会帮助你了解基础知识..

    Here

    【讨论】:

    • 我不会做无穷无尽的 for 循环。也许添加一个计数器以便您可以运行循环 10 次? (10 只是一个例子)。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-02-27
    • 2019-05-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-06-24
    相关资源
    最近更新 更多