1. ForkJoin
ForkJoin 框架现实一思想是:分而治之, 是将一个单线程递归问题,用多线程来解决,从而提高处理效率。
1.1 ForkJoin 使用标准范式
RecursiveTask 和 RecursiveAction 都是 ForkJoinTask 的继承者,前者用来处理有返回值的计算,后者没有返回值。
1.2 示例代码
用 ForkJoin 对一个大数组中所有的数求和。
public class ForkJoinSum extends RecursiveTask<Integer> {
private static final int ThresholdValue;
private int[] arr;
private int fromIndex;
private int toIndex;
public ForkJoinSum(int[] arr, int fromIndex, int toIndex) {
this.arr = arr;
this.fromIndex = fromIndex;
this.toIndex = toIndex;
}
protected Integer compute() {
int sum;
// 相当于递归的停止
if (ThresholdValue >= this.toIndex - this.fromIndex) {
sum = 0;
try {
for (int i = this.fromIndex; i <= this.toIndex; ++i) {
Thread.sleep(1L);
sum += this.arr[i];
}
} catch (InterruptedException var4) {
var4.printStackTrace();
}
return sum;
}
// 相当于再递归
else {
// 分
sum = (this.toIndex + this.fromIndex) / 2;
ForkJoinSum left = new ForkJoinSum(this.arr, this.fromIndex, sum);
ForkJoinSum right = new ForkJoinSum(this.arr, sum + 1, this.toIndex);
invokeAll(left, right);
// 合
return (Integer) left.join() + (Integer) right.join();
}
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
ForkJoinSum sum = new ForkJoinSum(MakeArray.getArr(), 0, MakeArray.MaxCount - 1);
long start = System.currentTimeMillis();
// 同步调用
pool.invoke(sum);
// 异步调用
// pool.execute(sum);
long end = System.currentTimeMillis();
System.out.println("ForkJoinSum 用时 " + (end - start) + "ms");
}
static {
ThresholdValue = MakeArray.MaxCount / 10;
}
}
2. CountDownLatch
CountDownLatch 可以处理几个线程之间的协作。使用场景如下:在 主线程 要去做一些事之前,必须启动一个或多个 辅助线程 去做一些初始化工作,并且只有在这些 辅助线程 都完成工作后,主线程 才可以再往下走。
示例代码:
// 演示CountDownLatch用法。有5个初始化线程,6个扣除点
// 扣除完毕后,主线程和业务线程继续自己的工作
public class UseCountDownLetch {
private static CountDownLatch countDownLatch = new CountDownLatch(6);
// 初始化线程任务
private static class InitThread implements Runnable {
@Override
public void run() {
threadPrint("工作中", 2);
countDownLatch.countDown();
}
}
// 业务线程任务
private static class BusinessThread implements Runnable {
@Override
public void run() {
try {
countDownLatch.await();
threadPrint("工作中", 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
// 先开启一个线程,完成两个工作(作两次扣除)
new Thread("FirstInit") {
@Override
public void run() {
threadPrint("完成第一部分工作", 2);
countDownLatch.countDown();
threadPrint("完成第二部分工作", 2);
countDownLatch.countDown();
}
}.start();
// 开启4个初始化线程
for (int i = 0; i < 4; i++) {
new Thread(new InitThread(), "初始化线程" + i).start();
}
// 开启业务线程
new Thread(new BusinessThread(), "业务线程").start();
// 让主线程在所有初始化线程完成之后再工作
countDownLatch.await();
threadPrint("工作中", 5);
}
private static void threadPrint(String content, int repeat) {
for (int i = 0; i < repeat; i++) {
System.out.println("线程:[" + Thread.currentThread().getName() + "] " + content);
}
}
}
3. CyclicBarrier
CyclicBarrier 用于处理一组线程的运行。场景:CyclicBarrier 就像是一扇大门,这个大门需要同时5个人才能推开,这5个人就是5个线程。当5个人同是站在门前并推门时,门才能打开5个人才能向前再走,5个线程再向下运行。
示例代码
// 演示CyclicBarrier的使用
// 在一组线程都达到屏障时,屏障才将所有线程放行
public class UseCyclicBarrier {
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new BarrierAction());
private static ConcurrentMap<String, Integer> map = new ConcurrentHashMap<>();
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new Thread(new BusinessThread(), "业务线程" + i).start();
}
}
// 屏障打开时任务
private static class BarrierAction implements Runnable {
@Override
public void run() {
System.out.println("打印 map 开始");
for (String key : map.keySet()) {
System.out.print(key + "=" + map.get(key) + " ");
}
System.out.println("\n打印 map 结束");
}
}
// 业务线程
private static class BusinessThread implements Runnable {
@Override
public void run() {
Thread thread = Thread.currentThread();
map.put(thread.getName(), new Random().nextInt(10));
System.out.println(thread.getName() + " 进行第一部分工作了...");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(thread.getName() + " 进行第二部分工作了...");
}
}
}
4. Semaphore
控制同时访问某个资源的线程数量。Semaphore 有一个构造器:
Semaphore(int permits); permits 是这个信号量的最大值,也就是同是充许这么多的线程获取证书。
获取证书的方法: acquire();如果permits>0,可以获取证书,否则不能获取,线程进入阻塞状态。
释放证书的方法:release();permits 会加1。
示例代码
简单模拟一个数据库连接池。
public class DBPoolSemaphore {
private static Integer PoolSize = 10;
private static LinkedList<Connection> Pool;
// useful 表示可用连接, useless 表示已用连接
private Semaphore useful, useless;
public DBPoolSemaphore() {
this.useful = new Semaphore(PoolSize);
this.useless = new Semaphore(0);
}
static {
Pool = new LinkedList<>();
for (int i = 0; i < PoolSize; i++) {
Pool.add(SqlConnection.fetchConnection());
}
}
// 从池子中拿连接
public Connection takeConnect() throws InterruptedException {
System.out.println("当线可用连接数 " + useful.availablePermits() + "; 不可用连接数 " + useless.availablePermits()
+ "; 有 " + useful.getQueueLength() + " 在等待连接");
// 申请证书, 可用连接-1。如果可用连接数0,那么线程就会阻塞
useful.acquire();
Connection connection;
synchronized (Pool){
connection = Pool.removeFirst();
}
// 不可用(已用)连接+1
useless.release();
return connection;
}
public void returnConnect(Connection connection) throws InterruptedException {
if(null != connection){
useless.acquire();
synchronized (Pool){
Pool.addLast(connection);
}
useful.release();
}
}
}
5. Exchange
exchange 只能作用在两个线程之间,交换它们之间的变量。
示例代码
public class UseExchange {
private static Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) {
new Thread1().start();
new Thread2().start();
}
// 业务线程1
private static class Thread1 extends Thread {
@Override
public void run() {
String a = "hello";
try {
a = exchanger.exchange(a);
System.out.println("线程1交换后的结果是" + a); //world
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 业务线程2
private static class Thread2 extends Thread {
@Override
public void run() {
String a = "world";
try {
a = exchanger.exchange(a);
System.out.println("线程2交换后的结果是" + a);//hello
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}