1. ForkJoin

ForkJoin 框架现实一思想是:分而治之, 是将一个单线程递归问题,用多线程来解决,从而提高处理效率。

1.1 ForkJoin 使用标准范式

高并发之-工具类
RecursiveTask 和 RecursiveAction 都是 ForkJoinTask 的继承者,前者用来处理有返回值的计算,后者没有返回值。

1.2 示例代码

用 ForkJoin 对一个大数组中所有的数求和。

public class ForkJoinSum extends RecursiveTask<Integer> {
    private static final int ThresholdValue;
    private int[] arr;
    private int fromIndex;
    private int toIndex;

    public ForkJoinSum(int[] arr, int fromIndex, int toIndex) {
        this.arr = arr;
        this.fromIndex = fromIndex;
        this.toIndex = toIndex;
    }

    protected Integer compute() {
        int sum;
        
        // 相当于递归的停止
        if (ThresholdValue >= this.toIndex - this.fromIndex) {
            sum = 0;

            try {
                for (int i = this.fromIndex; i <= this.toIndex; ++i) {
                    Thread.sleep(1L);
                    sum += this.arr[i];
                }
            } catch (InterruptedException var4) {
                var4.printStackTrace();
            }

            return sum;
        } 
        // 相当于再递归
        else {
            // 分
            sum = (this.toIndex + this.fromIndex) / 2;
            ForkJoinSum left = new ForkJoinSum(this.arr, this.fromIndex, sum);
            ForkJoinSum right = new ForkJoinSum(this.arr, sum + 1, this.toIndex);
            invokeAll(left, right);
            
            // 合
            return (Integer) left.join() + (Integer) right.join();
        }
    }

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinSum sum = new ForkJoinSum(MakeArray.getArr(), 0, MakeArray.MaxCount - 1);
        long start = System.currentTimeMillis();
//        同步调用
        pool.invoke(sum);
//        异步调用
//        pool.execute(sum);
        long end = System.currentTimeMillis();
        System.out.println("ForkJoinSum 用时 " + (end - start) + "ms");
    }

    static {
        ThresholdValue = MakeArray.MaxCount / 10;
    }
}

2. CountDownLatch

CountDownLatch 可以处理几个线程之间的协作。使用场景如下:在 主线程 要去做一些事之前,必须启动一个或多个 辅助线程 去做一些初始化工作,并且只有在这些 辅助线程 都完成工作后,主线程 才可以再往下走。

示例代码:

// 演示CountDownLatch用法。有5个初始化线程,6个扣除点
// 扣除完毕后,主线程和业务线程继续自己的工作
public class UseCountDownLetch {

    private static CountDownLatch countDownLatch = new CountDownLatch(6);

    //    初始化线程任务
    private static class InitThread implements Runnable {

        @Override
        public void run() {
            threadPrint("工作中", 2);
            countDownLatch.countDown();
        }
    }

    //    业务线程任务
    private static class BusinessThread implements Runnable {

        @Override
        public void run() {
            try {
                countDownLatch.await();
                threadPrint("工作中", 5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
//        先开启一个线程,完成两个工作(作两次扣除)
        new Thread("FirstInit") {
            @Override
            public void run() {
                threadPrint("完成第一部分工作", 2);
                countDownLatch.countDown();
                threadPrint("完成第二部分工作", 2);
                countDownLatch.countDown();
            }
        }.start();

//        开启4个初始化线程
        for (int i = 0; i < 4; i++) {
            new Thread(new InitThread(), "初始化线程" + i).start();
        }

//        开启业务线程
        new Thread(new BusinessThread(), "业务线程").start();

//        让主线程在所有初始化线程完成之后再工作
        countDownLatch.await();
        threadPrint("工作中", 5);
    }

    private static void threadPrint(String content, int repeat) {
        for (int i = 0; i < repeat; i++) {
            System.out.println("线程:[" + Thread.currentThread().getName() + "] " + content);
        }
    }
}

3. CyclicBarrier

CyclicBarrier 用于处理一组线程的运行。场景:CyclicBarrier 就像是一扇大门,这个大门需要同时5个人才能推开,这5个人就是5个线程。当5个人同是站在门前并推门时,门才能打开5个人才能向前再走,5个线程再向下运行。

示例代码

// 演示CyclicBarrier的使用
// 在一组线程都达到屏障时,屏障才将所有线程放行
public class UseCyclicBarrier {

    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new BarrierAction());
    private static ConcurrentMap<String, Integer> map = new ConcurrentHashMap<>();

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            new Thread(new BusinessThread(), "业务线程" + i).start();
        }
    }

    //    屏障打开时任务
    private static class BarrierAction implements Runnable {

        @Override
        public void run() {
            System.out.println("打印 map 开始");
            for (String key : map.keySet()) {
                System.out.print(key + "=" + map.get(key) + " ");
            }
            System.out.println("\n打印 map 结束");
        }
    }

    //    业务线程
    private static class BusinessThread implements Runnable {

        @Override
        public void run() {
            Thread thread = Thread.currentThread();
            map.put(thread.getName(), new Random().nextInt(10));
            System.out.println(thread.getName() + " 进行第一部分工作了...");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

            System.out.println(thread.getName() + " 进行第二部分工作了...");
        }
    }
}

4. Semaphore

控制同时访问某个资源的线程数量。Semaphore 有一个构造器:
Semaphore(int permits); permits 是这个信号量的最大值,也就是同是充许这么多的线程获取证书。

获取证书的方法: acquire();如果permits>0,可以获取证书,否则不能获取,线程进入阻塞状态。

释放证书的方法:release();permits 会加1。

示例代码

简单模拟一个数据库连接池。

public class DBPoolSemaphore {

    private static Integer PoolSize = 10;
    private static LinkedList<Connection> Pool;
//    useful 表示可用连接, useless 表示已用连接
    private Semaphore useful, useless;

    public DBPoolSemaphore() {
        this.useful = new Semaphore(PoolSize);
        this.useless = new Semaphore(0);
    }

    static {
        Pool = new LinkedList<>();
        for (int i = 0; i < PoolSize; i++) {
            Pool.add(SqlConnection.fetchConnection());
        }
    }

//    从池子中拿连接
    public Connection takeConnect() throws InterruptedException {
        System.out.println("当线可用连接数 " + useful.availablePermits() + "; 不可用连接数 " + useless.availablePermits()
                + "; 有 " + useful.getQueueLength() + " 在等待连接");

//        申请证书, 可用连接-1。如果可用连接数0,那么线程就会阻塞
        useful.acquire();
        Connection connection;
        synchronized (Pool){
            connection = Pool.removeFirst();
        }
//        不可用(已用)连接+1
        useless.release();
        return connection;
    }

    public void returnConnect(Connection connection) throws InterruptedException {
        if(null != connection){
            useless.acquire();
            synchronized (Pool){
                Pool.addLast(connection);
            }
            useful.release();
        }
    }
}

5. Exchange

exchange 只能作用在两个线程之间,交换它们之间的变量。

示例代码

public class UseExchange {

    private static Exchanger<String> exchanger = new Exchanger<>();

    public static void main(String[] args) {
        new Thread1().start();
        new Thread2().start();
    }

    //    业务线程1
    private static class Thread1 extends Thread {
        @Override
        public void run() {
            String a = "hello";
            try {
                a = exchanger.exchange(a);
                System.out.println("线程1交换后的结果是" + a); //world
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    //    业务线程2
    private static class Thread2 extends Thread {
        @Override
        public void run() {
            String a = "world";
            try {
                a = exchanger.exchange(a);
                System.out.println("线程2交换后的结果是" + a);//hello
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }


    }

    
}

相关文章: