工作之余充电:https://www.bilibili.com/video/BV16J411h7Rd?p=1
笔记待更新
1.4 预备知识
logback.xml 配置如下
2. 进程与线程
- 进程和线程的概念
- 并行和并发的概念
- 线程基本应用
2.1 进程与线程
- 程序由指令和数据组成,但这些指令要运行,数据要读写,就必须将指令加载至 CPU,数据加载至内存。在指令运行过程中还需要用到磁盘、网络等设备。进程就是用来加载指令、管理内存、管理 IO 的
- 当一个程序被运行,从磁盘加载这个程序的代码至内存,这时就开启了一个进程。
- 进程就可以视为程序的一个实例。大部分程序可以同时运行多个实例进程(例如记事本、画图、浏览器
- 等),也有的程序只能启动一个实例进程(例如网易云音乐、360 安全卫士等)
- 一个进程之内可以分为一到多个线程。
- 一个线程就是一个指令流,将指令流中的一条条指令以一定的顺序交给 CPU 执行
- Java 中,线程作为最小调度单位,进程作为资源分配的最小单位。 在 windows 中进程是不活动的,只是作为线程的容器
二者对比
- 进程基本上相互独立的,而线程存在于进程内,是进程的一个子集
- 进程拥有共享的资源,如内存空间等,供其内部的线程共享
- 进程间通信较为复杂
- 线程通信相对简单,因为它们共享进程内的内存,一个例子是多个线程可以访问同一个共享变量
- 线程更轻量,线程上下文切换成本一般上要比进程上下文切换低
2.2 并行与并发
- 并发(concurrent)是同一时间应对(dealing with)多件事情的能力
- 并行(parallel)是同一时间动手做(doing)多件事情的能力
- 家庭主妇做饭、打扫卫生、给孩子喂奶,她一个人轮流交替做这多件事,这时就是并发
- 家庭主妇雇了个保姆,她们一起这些事,这时既有并发,也有并行(这时会产生竞争,例如锅只有一口,一
- 雇了3个保姆,一个专做饭、一个专打扫卫生、一个专喂奶,互不干扰,这时是并行
2.3 应用
- 需要等待结果返回,才能继续运行就是同步
- 不需要等待结果返回,就能继续运行就是异步
- 比如在项目中,视频文件需要转换格式等操作比较费时,这时开一个新线程处理视频转换,避免阻塞主线程
- tomcat 的异步 servlet 也是类似的目的,让用户线程处理耗时较长的操作,避免阻塞 tomcat 的工作线程
- ui 程序中,开线程进行其他操作,避免阻塞 ui 线程
计算 1 花费 10 ms
计算 2 花费 11 ms
计算 3 花费 9 ms
汇总需要 1 ms
- 如果是串行执行,那么总共花费的时间是 10 + 11 + 9 + 1 = 31ms
- 但如果是四核 cpu,各个核心分别使用线程 1 执行计算 1,线程 2 执行计算 2,线程 3 执行计算 3,那么 3 个
- 有些任务,经过精心设计,将任务拆分,并行执行,当然可以提高程序的运行效率。但不是所有计算任务都能拆分(参考后文的【阿姆达尔定律】)
- 也不是所有任务都需要拆分,任务的目的如果不同,谈拆分和效率没啥意义
3. Java 线程
- 创建和运行线程
- 查看线程
- 线程 API
- 线程状态
3.1 创建和运行线程
// 创建线程对象 Thread t = new Thread() { public void run() { // 要执行的任务 } }; // 启动线程 t.start();
// 构造方法的参数是给线程指定名字,推荐 Thread t1 = new Thread("t1") { @Override // run 方法内实现了要执行的任务 public void run() { log.debug("hello"); } }; t1.start();
19:19:00 [t1] c.ThreadStarter - hello
方法二,使用 Runnable 配合 Thread
- Thread 代表线程
- Runnable 可运行的任务(线程要执行的代码)
Runnable runnable = new Runnable() { public void run(){ // 要执行的任务 } }; // 创建线程对象 Thread t = new Thread( runnable ); // 启动线程 t.start();
// 创建任务对象 Runnable task2 = new Runnable() { @Override public void run() { log.debug("hello"); } }; // 参数1 是任务对象; 参数2 是线程名字,推荐 Thread t2 = new Thread(task2, "t2"); t2.start();
19:19:00 [t2] c.ThreadStarter - hello
// 创建任务对象 Runnable task2 = () -> log.debug("hello"); // 参数1 是任务对象; 参数2 是线程名字,推荐 Thread t2 = new Thread(task2, "t2"); t2.start();
* 原理之 Thread 与 Runnable 的关系
- 方法1 是把线程和任务合并在了一起,方法2 是把线程和任务分开了
- 用 Runnable 更容易与线程池等高级 API 配合
- 用 Runnable 让任务类脱离了 Thread 继承体系,更灵活
// 创建任务对象 FutureTask<Integer> task3 = new FutureTask<>(() -> { log.debug("hello"); return 100; }); // 参数1 是任务对象; 参数2 是线程名字,推荐 new Thread(task3, "t3").start(); // 主线程阻塞,同步等待 task 执行完毕的结果 Integer result = task3.get(); log.debug("结果是:{}", result);
19:22:27 [t3] c.ThreadStarter - hello
19:22:27 [main] c.ThreadStarter - 结果是:100
3.2 观察多个线程同时运行
- 交替执行
- 谁先谁后,不由我们控制
3.3 查看进程线程的方法
- 任务管理器可以查看进程和线程数,也可以用来杀死进程
- tasklist 查看进程
- taskkill 杀死进程
- ps -fe 查看所有进程
- ps -fT -p <PID> 查看某个进程(PID)的所有线程
- kill 杀死进程
- top 按大写 H 切换是否显示线程
- top -H -p <PID> 查看某个进程(PID)的所有线程
- jps 命令查看所有 Java 进程
- jstack <PID> 查看某个 Java 进程(PID)的所有线程状态
- jconsole 来查看某个 Java 进程中线程的运行情况(图形界面)
- 需要以如下方式运行你的 java 类
java -Djava.rmi.server.hostname=`ip地址` -Dcom.sun.management.jmxremote - Dcom.sun.management.jmxremote.port=`连接端口` -Dcom.sun.management.jmxremote.ssl=是否安全连接 - Dcom.sun.management.jmxremote.authenticate=是否认证 java类
- 修改 /etc/hosts 文件将 127.0.0.1 映射至主机名如果要认证访问,还需要做如下步骤
- 复制 jmxremote.password 文件
- 修改 jmxremote.password 和 jmxremote.access 文件的权限为 600 即文件所有者可读写
- 连接时填入 controlRole(用户名),R&D(密码)
3.4 * 原理之线程运行
- 每个栈由多个栈帧(Frame)组成,对应着每次方法调用时所占用的内存
- 每个线程只能有一个活动栈帧,对应着当前正在执行的那个方法
- 线程的 cpu 时间片用完
- 垃圾回收
- 有更高优先级的线程需要运行
- 线程自己调用了 sleep、yield、wait、join、park、synchronized、lock 等方法
- 状态包括程序计数器、虚拟机栈中每个栈帧的信息,如局部变量、操作数栈、返回地址等
- Context Switch 频繁发生会影响性能
3.5 常见方法
3.6 start 与 run
public static void main(String[] args) { Thread t1 = new Thread("t1") { @Override public void run() { log.debug(Thread.currentThread().getName()); FileReader.read(Constants.MP4_FULL_PATH); } }; t1.run(); log.debug("do other things ..."); }
19:39:14 [main] c.TestStart - main 19:39:14 [main] c.FileReader - read [1.mp4] start ... 19:39:18 [main] c.FileReader - read [1.mp4] end ... cost: 4227 ms 19:39:18 [main] c.TestStart - do other things ...
t1.start();
19:41:30 [main] c.TestStart - do other things ... 19:41:30 [t1] c.TestStart - t1 19:41:30 [t1] c.FileReader - read [1.mp4] start ... 19:41:35 [t1] c.FileReader - read [1.mp4] end ... cost: 4542 ms
- 直接调用 run 是在主线程中执行了 run,没有启动新的线程
- 使用 start 是启动新的线程,通过新的线程间接执行 run 中的代码
3.7 sleep 与 yield
- 1. 调用 sleep 会让当前线程从 Running 进入 Timed Waiting 状态(阻塞)
- 2. 其它线程可以使用 interrupt 方法打断正在睡眠的线程,这时 sleep 方法会抛出 InterruptedException
- 3. 睡眠结束后的线程未必会立刻得到执行
- 4. 建议用 TimeUnit 的 sleep 代替 Thread 的 sleep 来获得更好的可读性
- 1. 调用 yield 会让当前线程从 Running 进入 Runnable 就绪状态,然后调度执行其它线程
- 2. 具体的实现依赖于操作系统的任务调度器
- 线程优先级会提示(hint)调度器优先调度该线程,但它仅仅是一个提示,调度器可以忽略它
- 如果 cpu 比较忙,那么优先级高的线程会获得更多的时间片,但 cpu 闲时,优先级几乎没作用
Runnable task1 = () -> { int count = 0; for (;;) { System.out.println("---->1 " + count++); } }; Runnable task2 = () -> { int count = 0; for (;;) { // Thread.yield(); System.out.println(" ---->2 " + count++); } }; Thread t1 = new Thread(task1, "t1"); Thread t2 = new Thread(task2, "t2"); // t1.setPriority(Thread.MIN_PRIORITY); // t2.setPriority(Thread.MAX_PRIORITY); t1.start(); t2.start();
3.8 join 方法详解
static int r = 0; public static void main(String[] args) throws InterruptedException { test1(); } private static void test1() throws InterruptedException { log.debug("开始"); Thread t1 = new Thread(() -> { log.debug("开始"); sleep(1); log.debug("结束"); r = 10; }); t1.start(); log.debug("结果为:{}", r); log.debug("结束"); }
- 因为主线程和线程 t1 是并行执行的,t1 线程需要 1 秒之后才能算出 r=10
- 而主线程一开始就要打印 r 的结果,所以只能打印出 r=0
- 用 sleep 行不行?为什么?
- 用 join,加在 t1.start() 之后即可
- 需要等待结果返回,才能继续运行就是同步
- 不需要等待结果返回,就能继续运行就是异步
static int r1 = 0; static int r2 = 0; public static void main(String[] args) throws InterruptedException { test2(); } private static void test2() throws InterruptedException { Thread t1 = new Thread(() -> { sleep(1); r1 = 10; }); Thread t2 = new Thread(() -> { sleep(2); r2 = 20; }); long start = System.currentTimeMillis(); t1.start(); t2.start(); t1.join(); t2.join(); long end = System.currentTimeMillis(); log.debug("r1: {} r2: {} cost: {}", r1, r2, end - start); }
- 第一个 join:等待 t1 时, t2 并没有停止, 而在运行
- 第二个 join:1s 后, 执行到此, t2 也运行了 1s, 因此也只需再等待 1s
20:45:43.239 [main] c.TestJoin - r1: 10 r2: 20 cost: 2005
static int r1 = 0; static int r2 = 0; public static void main(String[] args) throws InterruptedException { test3(); } public static void test3() throws InterruptedException { Thread t1 = new Thread(() -> { sleep(1); r1 = 10; }); long start = System.currentTimeMillis(); t1.start(); // 线程执行结束会导致 join 结束 t1.join(1500); long end = System.currentTimeMillis(); log.debug("r1: {} r2: {} cost: {}", r1, r2, end - start); }
20:48:01.320 [main] c.TestJoin - r1: 10 r2: 0 cost: 1010
static int r1 = 0; static int r2 = 0; public static void main(String[] args) throws InterruptedException { test3(); } public static void test3() throws InterruptedException { Thread t1 = new Thread(() -> { sleep(2); r1 = 10; }); long start = System.currentTimeMillis(); t1.start(); // 线程执行结束会导致 join 结束 t1.join(1500); long end = System.currentTimeMillis(); log.debug("r1: {} r2: {} cost: {}", r1, r2, end - start); }
20:52:15.623 [main] c.TestJoin - r1: 0 r2: 0 cost: 1502
3.9 interrupt 方法详解
private static void test1() throws InterruptedException { Thread t1 = new Thread(()->{ sleep(1); }, "t1"); t1.start(); sleep(0.5); t1.interrupt(); log.debug(" 打断状态: {}", t1.isInterrupted()); }
java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at java.lang.Thread.sleep(Thread.java:340) at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386) at cn.itcast.n2.util.Sleeper.sleep(Sleeper.java:8) at cn.itcast.n4.TestInterrupt.lambda$test1$3(TestInterrupt.java:59) at java.lang.Thread.run(Thread.java:745) 21:18:10.374 [main] c.TestInterrupt - 打断状态: false
private static void test2() throws InterruptedException { Thread t2 = new Thread(()->{ while(true) { Thread current = Thread.currentThread(); boolean interrupted = current.isInterrupted(); if(interrupted) { log.debug(" 打断状态: {}", interrupted); break; } } }, "t2"); t2.start(); sleep(0.5); t2.interrupt(); }
20:57:37.964 [t2] c.TestInterrupt - 打断状态: true
Two Phase Termination,就是考虑在一个线程T1中如何优雅地终止另一个线程T2?这里的优雅指的是给T2一个料理后事的机会(如释放锁)。
如下所示:那么线程的isInterrupted()方法可以取得线程的打断标记,如果线程在睡眠sleep期间被打断,打断标记是不会变的,为false,但是sleep期间被打断会抛出异常,我们据此手动设置打断标记为true;如果是在程序正常运行期间被打断的,那么打断标记就被自动设置为true
代码实现如下:
@Slf4j public class Test11 { public static void main(String[] args) throws InterruptedException { TwoParseTermination twoParseTermination = new TwoParseTermination(); twoParseTermination.start(); Thread.sleep(3000); // 让监控线程执行一会儿 twoParseTermination.stop(); // 停止监控线程 } } @Slf4j class TwoParseTermination{ Thread thread ; public void start(){ thread = new Thread(()->{ while(true){ if (Thread.currentThread().isInterrupted()){ log.debug("线程结束。。正在料理后事中"); break; } try { Thread.sleep(500); log.debug("正在执行监控的功能"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); e.printStackTrace(); } } }); thread.start(); } public void stop(){ thread.interrupt(); } }
private static void test3() throws InterruptedException { Thread t1 = new Thread(() -> { log.debug("park..."); LockSupport.park(); log.debug("unpark..."); log.debug("打断状态:{}", Thread.currentThread().isInterrupted()); }, "t1"); t1.start(); sleep(0.5); t1.interrupt(); }
21:11:52.795 [t1] c.TestInterrupt - park... 21:11:53.295 [t1] c.TestInterrupt - unpark... 21:11:53.295 [t1] c.TestInterrupt - 打断状态:true
private static void test4() { Thread t1 = new Thread(() -> { for (int i = 0; i < 5; i++) { log.debug("park..."); LockSupport.park(); log.debug("打断状态:{}", Thread.currentThread().isInterrupted()); } }); t1.start(); sleep(1); t1.interrupt(); }
21:13:48.783 [Thread-0] c.TestInterrupt - park... 21:13:49.809 [Thread-0] c.TestInterrupt - 打断状态:true 21:13:49.812 [Thread-0] c.TestInterrupt - park... 21:13:49.813 [Thread-0] c.TestInterrupt - 打断状态:true 21:13:49.813 [Thread-0] c.TestInterrupt - park... 21:13:49.813 [Thread-0] c.TestInterrupt - 打断状态:true 21:13:49.813 [Thread-0] c.TestInterrupt - park... 21:13:49.813 [Thread-0] c.TestInterrupt - 打断状态:true 21:13:49.813 [Thread-0] c.TestInterrupt - park... 21:13:49.813 [Thread-0] c.TestInterrupt - 打断状态:true
3.10 不推荐的方法
3.11 主线程与守护线程
log.debug("开始运行...");
Thread t1 = new Thread(() -> {
log.debug("开始运行...");
sleep(2);
log.debug("运行结束...");
}, "daemon");
// 设置该线程为守护线程
t1.setDaemon(true);
t1.start();
sleep(1);
log.debug("运行结束...");
08:26:38.123 [main] c.TestDaemon - 开始运行... 08:26:38.213 [daemon] c.TestDaemon - 开始运行... 08:26:39.215 [main] c.TestDaemon - 运行结束...
- 垃圾回收器线程就是一种守护线程
- Tomcat 中的 Acceptor 和 Poller 线程都是守护线程,所以 Tomcat 接收到 shutdown 命令后,不会等
3.12 五种状态
- 【初始状态】仅是在语言层面创建了线程对象,还未与操作系统线程关联
- 【可运行状态】(就绪状态)指该线程已经被创建(与操作系统线程关联),可以由 CPU 调度执行
- 【运行状态】指获取了 CPU 时间片运行中的状态
【阻塞状态】
- 如果调用了阻塞 API,如 BIO 读写文件,这时该线程实际不会用到 CPU,会导致线程上下文切换,进入【阻塞状态】
- 等 BIO 操作完毕,会由操作系统唤醒阻塞的线程,转换至【可运行状态】
- 与【可运行状态】的区别是,对【阻塞状态】的线程来说只要它们一直不唤醒,调度器就一直不会考虑调度它们
3.13 六种状态
- NEW 线程刚被创建,但是还没有调用 start() 方法
- RUNNABLE 当调用了 start() 方法之后,注意,Java API 层面的 RUNNABLE 状态涵盖了 操作系统 层面的
- BLOCKED , WAITING , TIMED_WAITING 都是 Java API 层面对【阻塞状态】的细分,后面会在状态转换一节详述
- TERMINATED 当线程代码运行结束
3.14 习题
本章小结
- 线程创建
- 线程重要 api,如 start,run,sleep,join,interrupt 等
- 线程状态
- 应用方面
- 异步调用:主线程执行期间,其它线程异步执行耗时操作
- 提高效率:并行计算,缩短运算时间
- 同步等待:join
- 统筹规划:合理使用线程,得到最优效果
- 线程运行流程:栈、栈帧、上下文切换、程序计数器
- Thread 两种创建方式 的源码
- 终止模式之两阶段终止
4. 共享模型之管程
4.1 共享带来的问题
- 在这些时候,算盘没利用起来(不能收钱了),老王觉得有点不划算
- 另外,小女也想用用算盘,如果总是小南占着算盘,让小女觉得不公平
- 于是,老王灵机一动,想了个办法 [ 让他们每人用一会,轮流使用算盘 ]
- 这样,当小南阻塞的时候,算盘可以分给小女使用,不会浪费,反之亦然
- 最近执行的计算比较复杂,需要存储一些中间结果,而学生们的脑容量(工作内存)不够,所以老王申请了一个笔记本(主存),把一些中间结果先记在本上
- 计算流程是这样的
- 但是由于分时系统,有一天还是发生了事故
- 小南刚读取了初始值 0 做了个 +1 运算,还没来得及写回结果
- 老王说 [ 小南,你的时间到了,该别人了,记住结果走吧 ],于是小南念叨着 [ 结果是1,结果是1...] 不甘心地
- 到一边待着去了(上下文切换)
- 老王说 [ 小女,该你了 ],小女看到了笔记本上还写着 0 做了一个 -1 运算,将结果 -1 写入笔记本
- 这时小女的时间也用完了,老王又叫醒了小南:[小南,把你上次的题目算完吧],小南将他脑海中的结果 1 写
- 入了笔记本
static int counter = 0; public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { for (int i = 0; i < 5000; i++) { counter++; } }, "t1"); Thread t2 = new Thread(() -> { for (int i = 0; i < 5000; i++) { counter--; } }, "t2"); t1.start(); t2.start(); t1.join(); t2.join(); log.debug("{}",counter); }
- 一个程序运行多个线程本身是没有问题的
- 问题出在多个线程访问共享资源
- 多个线程读共享资源其实也没有问题
- 在多个线程对共享资源读写操作时发生指令交错,就会出现问题
- 一段代码块内如果存在对共享资源的多线程读写操作,称这段代码块为临界区
static int counter = 0; static void increment() // 临界区 { counter++; } static void decrement() // 临界区 { counter--; }
4.2 synchronized 解决方案
- 阻塞式的解决方案:synchronized,Lock
- 非阻塞式的解决方案:原子变量
- 互斥是保证临界区的竞态条件发生,同一时刻只能有一个线程执行临界区代码
- 同步是由于线程执行的先后、顺序不同、需要一个线程等待其它线程运行到某个点
synchronized(对象) // 线程1, 线程2(blocked) { 临界区 }
static int counter = 0; static final Object room = new Object(); public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { for (int i = 0; i < 5000; i++) { synchronized (room) { counter++; } } }, "t1"); Thread t2 = new Thread(() -> { for (int i = 0; i < 5000; i++) { synchronized (room) { counter--; } } }, "t2"); t1.start(); t2.start(); t1.join(); t2.join(); log.debug("{}",counter); }
- synchronized(对象) 中的对象,可以想象为一个房间(room),有唯一入口(门)房间只能一次进入一人
- 进行计算,线程 t1,t2 想象成两个人
- 当线程 t1 执行到 synchronized(room) 时就好比 t1 进入了这个房间,并锁住了门拿走了钥匙,在门内执行
- count++ 代码
- 这时候如果 t2 也运行到了 synchronized(room) 时,它发现门被锁住了,只能在门外等待,发生了上下文切
- 换,阻塞住了
- 这中间即使 t1 的 cpu 时间片不幸用完,被踢出了门外(不要错误理解为锁住了对象就能一直执行下去哦),
- 这时门还是锁住的,t1 仍拿着钥匙,t2 线程还在阻塞状态进不来,只有下次轮到 t1 自己再次获得时间片时才
- 能开门进入
- 当 t1 执行完 synchronized{} 块内的代码,这时候才会从 obj 房间出来并解开门上的锁,唤醒 t2 线程把钥
- 匙给他。t2 线程这时才可以进入 obj 房间,锁住了门拿上钥匙,执行它的 count-- 代码
- 如果把 synchronized(obj) 放在 for 循环的外面,如何理解?-- 原子性
- 如果 t1 synchronized(obj1) 而 t2 synchronized(obj2) 会怎样运作?-- 锁对象
- 如果 t1 synchronized(obj) 而 t2 没有加会怎么样?如何理解?-- 锁对象
class Room { int value = 0; public void increment() { synchronized (this) { value++; } } public void decrement() { synchronized (this) { value--; } } public int get() { synchronized (this) { return value; } } } @Slf4j public class Test1 { public static void main(String[] args) throws InterruptedException { Room room = new Room(); Thread t1 = new Thread(() -> { for (int j = 0; j < 5000; j++) { room.increment(); } }, "t1"); Thread t2 = new Thread(() -> { for (int j = 0; j < 5000; j++) { room.decrement(); } }, "t2"); t1.start(); t2.start(); t1.join(); t2.join(); log.debug("count: {}" , room.get()); } }
4.3 方法上的 synchronized
class Test{ public synchronized void test() { } } 等价于 class Test{ public void test() { synchronized(this) { } } }
class Test{ public synchronized static void test() { } } 等价于 class Test{ public static void test() { synchronized(Test.class) { } } }
@Slf4j(topic = "c.Number") class Number{ public synchronized void a() { log.debug("1"); } public synchronized void b() { log.debug("2"); } } public static void main(String[] args) { Number n1 = new Number(); new Thread(()->{ n1.a(); }).start(); new Thread(()->{ n1.b(); }).start(); }
@Slf4j(topic = "c.Number") class Number{ public synchronized void a() { sleep(1); log.debug("1"); } public synchronized void b() { log.debug("2"); } } public static void main(String[] args) { Number n1 = new Number(); new Thread(()->{ n1.a(); }).start(); new Thread(()->{ n1.b(); }).start(); }
@Slf4j(topic = "c.Number") class Number{ public synchronized void a() { sleep(1); log.debug("1"); } public synchronized void b() { log.debug("2"); } public void c() { log.debug("3"); } } public static void main(String[] args) { Number n1 = new Number(); new Thread(()->{ n1.a(); }).start(); new Thread(()->{ n1.b(); }).start(); new Thread(()->{ n1.c(); }).start(); }
@Slf4j(topic = "c.Number") class Number{ public synchronized void a() { sleep(1); log.debug("1"); } public synchronized void b() { log.debug("2"); } } public static void main(String[] args) { Number n1 = new Number(); Number n2 = new Number(); new Thread(()->{ n1.a(); }).start(); new Thread(()->{ n2.b(); }).start(); }
@Slf4j(topic = "c.Number") class Number{ public static synchronized void a() { sleep(1); log.debug("1"); } public synchronized void b() { log.debug("2"); } } public static void main(String[] args) { Number n1 = new Number(); new Thread(()->{ n1.a(); }).start(); new Thread(()->{ n1.b(); }).start(); }
@Slf4j(topic = "c.Number") class Number{ public static synchronized void a() { sleep(1); log.debug("1"); } public static synchronized void b() { log.debug("2"); } } public static void main(String[] args) { Number n1 = new Number(); new Thread(()->{ n1.a(); }).start(); new Thread(()->{ n1.b(); }).start(); }
@Slf4j(topic = "c.Number") class Number{ public static synchronized void a() { sleep(1); log.debug("1"); } public synchronized void b() { log.debug("2"); } } public static void main(String[] args) { Number n1 = new Number(); Number n2 = new Number(); new Thread(()->{ n1.a(); }).start(); new Thread(()->{ n2.b(); }).start(); }
@Slf4j(topic = "c.Number") class Number{ public static synchronized void a() { sleep(1); log.debug("1"); } public static synchronized void b() { log.debug("2"); } } public static void main(String[] args) { Number n1 = new Number(); Number n2 = new Number(); new Thread(()->{ n1.a(); }).start(); new Thread(()->{ n2.b(); }).start(); }
4.4 变量的线程安全分析
- 如果它们没有共享,则线程安全
- 如果它们被共享了,根据它们的状态是否能够改变,又分两种情况
- 如果只有读操作,则线程安全
- 如果有读写操作,则这段代码是临界区,需要考虑线程安全
- 局部变量是线程安全的
- 但局部变量引用的对象则未必
- 如果该对象没有逃离方法的作用访问,它是线程安全的
- 如果该对象逃离方法的作用范围,需要考虑线程安全
public static void test1() { int i = 10; i++; }
public static void test1(); descriptor: ()V flags: ACC_PUBLIC, ACC_STATIC Code: stack=1, locals=1, args_size=0 0: bipush 10 2: istore_0 3: iinc 0, 1 6: return LineNumberTable: line 10: 0 line 11: 3 line 12: 6 LocalVariableTable: Start Length Slot Name Signature 3 4 0 i I
class ThreadUnsafe { ArrayList<String> list = new ArrayList<>(); public void method1(int loopNumber) { for (int i = 0; i < loopNumber; i++) { // { 临界区, 会产生竞态条件 method2(); method3(); // } 临界区 } } private void method2() { list.add("1"); } private void method3() { list.remove(0); } }
static final int THREAD_NUMBER = 2; static final int LOOP_NUMBER = 200; public static void main(String[] args) { ThreadUnsafe test = new ThreadUnsafe(); for (int i = 0; i < THREAD_NUMBER; i++) { new Thread(() -> { test.method1(LOOP_NUMBER); }, "Thread" + i).start(); } }
Exception in thread "Thread1" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at java.util.ArrayList.remove(ArrayList.java:496) at cn.itcast.n6.ThreadUnsafe.method3(TestThreadSafe.java:35) at cn.itcast.n6.ThreadUnsafe.method1(TestThreadSafe.java:26) at cn.itcast.n6.TestThreadSafe.lambda$main$0(TestThreadSafe.java:14) at java.lang.Thread.run(Thread.java:748)
分析:
无论哪个线程中的 method2 引用的都是同一个对象中的 list 成员变量
method3 与 method2 分析相同
class ThreadSafe { public final void method1(int loopNumber) { ArrayList<String> list = new ArrayList<>(); for (int i = 0; i < loopNumber; i++) { method2(list); method3(list); } } private void method2(ArrayList<String> list) { list.add("1"); } private void method3(ArrayList<String> list) { list.remove(0); } }
分析:
list 是局部变量,每个线程调用时会创建其不同实例,没有共享
而 method2 的参数是从 method1 中传递过来的,与 method1 中引用同一个对象
method3 的参数分析与 method2 相同
- 情况1:有其它线程调用 method2 和 method3
- 情况2:在 情况1 的基础上,为 ThreadSafe 类添加子类,子类覆盖 method2 或 method3 方法,即
class ThreadSafe { public final void method1(int loopNumber) { ArrayList<String> list = new ArrayList<>(); for (int i = 0; i < loopNumber; i++) { method2(list); method3(list); } } private void method2(ArrayList<String> list) { list.add("1"); } private void method3(ArrayList<String> list) { list.remove(0); } } class ThreadSafeSubClass extends ThreadSafe{ @Override public void method3(ArrayList<String> list) { new Thread(() -> { list.remove(0); }).start(); } }
- String
- Integer
- StringBuffffer
- Random
- Vector
- Hashtable
- java.util.concurrent 包下的类
Hashtable table = new Hashtable(); new Thread(()->{ table.put("key", "value1"); }).start(); new Thread(()->{ table.put("key", "value2"); }).start();
Hashtable table = new Hashtable(); // 线程1,线程2 if( table.get("key") == null) { table.put("key", value); }
public class Immutable{ private int value = 0; public Immutable(int value){ this.value = value; } public int getValue(){ return this.value; } }
public class Immutable{ private int value = 0; public Immutable(int value){ this.value = value; } public int getValue(){ return this.value; } public Immutable add(int v){ return new Immutable(this.value + v); } }
public class MyServlet extends HttpServlet { // 是否安全? Map<String,Object> map = new HashMap<>(); // 是否安全? String S1 = "..."; // 是否安全? final String S2 = "..."; // 是否安全? Date D1 = new Date(); // 是否安全? final Date D2 = new Date(); public void doGet(HttpServletRequest request, HttpServletResponse response) { // 使用上述变量 } }
public class MyServlet extends HttpServlet { // 是否安全? private UserService userService = new UserServiceImpl(); public void doGet(HttpServletRequest request, HttpServletResponse response) { userService.update(...); } } public class UserServiceImpl implements UserService { // 记录调用次数 private int count = 0; public void update() { // ... count++; } }
@Aspect @Component public class MyAspect { // 是否安全? private long start = 0L; @Before("execution(* *(..))") public void before() { start = System.nanoTime(); } @After("execution(* *(..))") public void after() { long end = System.nanoTime(); System.out.println("cost time:" + (end-start)); } }
public class MyServlet extends HttpServlet { // 是否安全 private UserService userService = new UserServiceImpl(); public void doGet(HttpServletRequest request, HttpServletResponse response) { userService.update(...); } } public class UserServiceImpl implements UserService { // 是否安全 private UserDao userDao = new UserDaoImpl(); public void update() { userDao.update(); } } public class UserDaoImpl implements UserDao { public void update() { String sql = "update user set password = ? where username = ?"; // 是否安全 try (Connection conn = DriverManager.getConnection("","","")){ // ... } catch (Exception e) { // ... } } }
public class MyServlet extends HttpServlet { // 是否安全 private UserService userService = new UserServiceImpl(); public void doGet(HttpServletRequest request, HttpServletResponse response) { userService.update(...); } } public class UserServiceImpl implements UserService { // 是否安全 private UserDao userDao = new UserDaoImpl(); public void update() { userDao.update(); } }
public class MyServlet extends HttpServlet { // 是否安全 private UserService userService = new UserServiceImpl(); public void doGet(HttpServletRequest request, HttpServletResponse response) { userService.update(...); } } public class UserServiceImpl implements UserService { public void update() { UserDao userDao = new UserDaoImpl(); userDao.update(); } } public class UserDaoImpl implements UserDao { // 是否安全 private Connection = null; public void update() throws SQLException { String sql = "update user set password = ? where username = ?"; conn = DriverManager.getConnection("","",""); // ... conn.close(); } }
public abstract class Test { public void bar() { // 是否安全 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); foo(sdf); } public abstract foo(SimpleDateFormat sdf); public static void main(String[] args) { new Test().bar(); } }
public void foo(SimpleDateFormat sdf) { String dateStr = "1999-10-11 00:00:00"; for (int i = 0; i < 20; i++) { new Thread(() -> { try { sdf.parse(dateStr); } catch (ParseException e) { e.printStackTrace(); } }).start(); } }
private static Integer i = 0; public static void main(String[] args) throws InterruptedException { List<Thread> list = new ArrayList<>(); for (int j = 0; j < 2; j++) { Thread thread = new Thread(() -> { for (int k = 0; k < 5000; k++) { synchronized (i) { i++; } } }, "" + j); list.add(thread); } list.stream().forEach(t -> t.start()); list.stream().forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } });
4.5 习题
public class ExerciseSell { public static void main(String[] args) { TicketWindow ticketWindow = new TicketWindow(2000); List<Thread> list = new ArrayList<>(); // 用来存储买出去多少张票 List<Integer> sellCount = new Vector<>(); for (int i = 0; i < 2000; i++) { Thread t = new Thread(() -> { // 分析这里的竞态条件 int count = ticketWindow.sell(randomAmount()); sellCount.add(count); }); list.add(t); t.start(); } list.forEach((t) -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); // 买出去的票求和 log.debug("selled count:{}",sellCount.stream().mapToInt(c -> c).sum()); // 剩余票数 log.debug("remainder count:{}", ticketWindow.getCount()); } // Random 为线程安全 static Random random = new Random(); // 随机 1~5 public static int randomAmount() { return random.nextInt(5) + 1; } } class TicketWindow { private int count; public TicketWindow(int count) { this.count = count; } public int getCount() { return count; } public int sell(int amount) { if (this.count >= amount) { this.count -= amount; return amount; } else { return 0; } } }
for /L %n in (1,1,10) do java -cp ".;C:\Users\manyh\.m2\repository\ch\qos\logback\logbackclassic\1.2.3\logback-classic-1.2.3.jar;C:\Users\manyh\.m2\repository\ch\qos\logback\logbackcore\1.2.3\logback-core-1.2.3.jar;C:\Users\manyh\.m2\repository\org\slf4j\slf4japi\1.7.25\slf4j-api-1.7.25.jar" cn.itcast.n4.exercise.ExerciseSell
public class ExerciseTransfer { public static void main(String[] args) throws InterruptedException { Account a = new Account(1000); Account b = new Account(1000); Thread t1 = new Thread(() -> { for (int i = 0; i < 1000; i++) { a.transfer(b, randomAmount()); } }, "t1"); Thread t2 = new Thread(() -> { for (int i = 0; i < 1000; i++) { b.transfer(a, randomAmount()); } }, "t2"); t1.start(); t2.start(); t1.join(); t2.join(); // 查看转账2000次后的总金额 log.debug("total:{}",(a.getMoney() + b.getMoney())); } // Random 为线程安全 static Random random = new Random(); // 随机 1~100 public static int randomAmount() { return random.nextInt(100) +1; } } class Account { private int money; public Account(int money) { this.money = money; } public int getMoney() { return money; } public void setMoney(int money) { this.money = money; } public void transfer(Account target, int amount) { if (this.money > amount) { this.setMoney(this.getMoney() - amount); target.setMoney(target.getMoney() + amount); } } }
public synchronized void transfer(Account target, int amount) { if (this.money > amount) { this.setMoney(this.getMoney() - amount); target.setMoney(target.getMoney() + amount); } }
4.6 Monitor 概念
Monitor 原理
Monitor被翻译为监视器或者说管程
每个java对象都可以关联一个Monitor,如果使用synchronized给对象上锁(重量级),该对象头的Mark Word中就被设置为指向Monitor对象的指针。
-
刚开始时Monitor中的Owner为null
-
当Thread-2 执行synchronized(obj){}代码时就会将Monitor的所有者Owner 设置为 Thread-2,上锁成功,Monitor中同一时刻只能有一个Owner
-
当Thread-2 占据锁时,如果线程Thread-3,Thread-4也来执行synchronized(obj){}代码,就会进入EntryList中变成BLOCKED状态
-
Thread-2 执行完同步代码块的内容,然后唤醒 EntryList 中等待的线程来竞争锁,竞争时是非公平的
-
图中 WaitSet 中的 Thread-0,Thread-1 是之前获得过锁,但条件不满足进入 WAITING 状态的线程,后面讲wait-notify 时会分析
注意:synchronized 必须是进入同一个对象的 monitor 才有上述的效果不加 synchronized 的对象不会关联监视器,不遵从以上规则
synchronized原理
static final Object lock=new Object(); static int counter = 0; public static void main(String[] args) { synchronized (lock) { counter++; } }
反编译后的部分字节码
0 getstatic #2 <com/concurrent/test/Test17.lock> # 取得lock的引用(synchronized开始了) 3 dup # 复制操作数栈栈顶的值放入栈顶,即复制了一份lock的引用 4 astore_1 # 操作数栈栈顶的值弹出,即将lock的引用存到局部变量表中 5 monitorenter # 将lock对象的Mark Word置为指向Monitor指针 6 getstatic #3 <com/concurrent/test/Test17.counter> 9 iconst_1 10 iadd 11 putstatic #3 <com/concurrent/test/Test17.counter> 14 aload_1 # 从局部变量表中取得lock的引用,放入操作数栈栈顶 15 monitorexit # 将lock对象的Mark Word重置,唤醒EntryList 16 goto 24 (+8) # 下面是异常处理指令,可以看到,如果出现异常,也能自动地释放锁 19 astore_2 20 aload_1 21 monitorexit 22 aload_2 23 athrow 24 return
注意:方法级别的 synchronized 不会在字节码指令中有所体现
synchronized 原理进阶
轻量级锁
轻量级锁的使用场景是:如果一个对象虽然有多个线程要对它进行加锁,但是加锁的时间是错开的(也就是没有人可以竞争的),那么可以使用轻量级锁来进行优化。轻量级锁对使用者是透明的,即语法仍然是synchronized,假设有两个方法同步块,利用同一个对象加锁
static final Object obj = new Object(); public static void method1() { synchronized( obj ) { // 同步块 A method2(); } } public static void method2() { synchronized( obj ) { // 同步块 B } }
1.每次指向到synchronized代码块时,都会创建锁记录(Lock Record)对象,每个线程都会包括一个锁记录的结构,锁记录内部可以储存对象的Mark Word和对象引用reference
2. 让锁记录中的Object reference指向对象,并且尝试用cas(compare and sweep)替换Object对象的Mark Word ,将Mark Word 的值存入锁记录中
3.如果cas替换成功,那么对象的对象头储存的就是锁记录的地址和状态01,如下所示
-
如果是其它线程已经持有了该Object的轻量级锁,那么表示有竞争,将进入锁膨胀阶段
-
5.
-
成功则解锁成功
-
锁膨胀
如果在尝试加轻量级锁的过程中,cas操作无法成功,这是有一种情况就是其它线程已经为这个对象加上了轻量级锁,这是就要进行锁膨胀,将轻量级锁变成重量级锁。
-
当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁
自旋优化
重量级锁竞争的时候,还可以使用自旋来进行优化,如果当前线程自旋成功(即在自旋的时候持锁的线程释放了锁),那么当前线程就可以不用进行上下文切换就获得了锁
1.自旋重试成功的情况
2.自旋重试失败的情况,自旋了一定次数还是没有等到持锁的线程释放锁
自旋会占用 CPU 时间,单核 CPU 自旋就是浪费,多核 CPU 自旋才能发挥优势。在 Java 6 之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次;反之,就少自旋甚至不自旋,总之,比较智能。Java 7 之后不能控制是否开启自旋功能
偏向锁
在轻量级的锁中,我们可以发现,如果同一个线程对同一个2对象进行重入锁时,也需要执行CAS操作,这是有点耗时滴,那么java6开始引入了偏向锁的东东,只有第一次使用CAS时将对象的Mark Word头设置为入锁线程ID,之后这个入锁线程再进行重入锁时,发现线程ID是自己的,那么就不用再进行CAS了
一个对象的创建过程
-
如果开启了偏向锁(默认是开启的),那么对象刚创建之后,Mark Word 最后三位的值101,并且这是它的Thread,epoch,age都是0,在加锁的时候进行设置这些的值.
-
偏向锁默认是延迟的,不会在程序启动的时候立刻生效,如果想避免延迟,可以添加虚拟机参数来禁用延迟:-
XX:BiasedLockingStartupDelay=0来禁用延迟 -
注意:处于偏向锁的对象解锁后,线程 id 仍存储于对象头中
-
实验Test18.java,加上虚拟机参数-XX:BiasedLockingStartupDelay=0进行测试
public static void main(String[] args) throws InterruptedException { Test1 t = new Test1(); test.parseObjectHeader(getObjectHeader(t)); synchronized (t){ test.parseObjectHeader(getObjectHeader(t)); } test.parseObjectHeader(getObjectHeader(t)); }
输出结果如下,三次输出的状态码都为101
biasedLockFlag (1bit): 1 LockFlag (2bit): 01 biasedLockFlag (1bit): 1 LockFlag (2bit): 01 biasedLockFlag (1bit): 1 LockFlag (2bit): 01
-
测试代码Test18.java 虚拟机参数
-XX:-UseBiasedLocking -
输出结果如下,最开始状态为001,然后加轻量级锁变成00,最后恢复成001
biasedLockFlag (1bit): 0 LockFlag (2bit): 01 LockFlag (2bit): 00 biasedLockFlag (1bit): 0 LockFlag (2bit): 01
测试 hashCode:当调用对象的hashcode方法的时候就会撤销这个对象的偏向锁,因为使用偏向锁时没有位置存hashcode的值了
-
测试代码如下,使用虚拟机参数
-XX:BiasedLockingStartupDelay=0,确保我们的程序最开始使用了偏向锁!但是结果显示程序还是使用了轻量级锁。 Test20.java
public static void main(String[] args) throws InterruptedException { Test1 t = new Test1(); t.hashCode(); test.parseObjectHeader(getObjectHeader(t)); synchronized (t){ test.parseObjectHeader(getObjectHeader(t)); } test.parseObjectHeader(getObjectHeader(t)); }
输出结果
biasedLockFlag (1bit): 0 LockFlag (2bit): 01 LockFlag (2bit): 00 biasedLockFlag (1bit): 0 LockFlag (2bit): 01
这里我们演示的是偏向锁撤销变成轻量级锁的过程,那么就得满足轻量级锁的使用条件,就是没有线程对同一个对象进行锁竞争,我们使用wait 和 notify 来辅助实现
-
代码 Test19.java,虚拟机参数
-XX:BiasedLockingStartupDelay=0确保我们的程序最开始使用了偏向锁! -
输出结果,最开始使用的是偏向锁,但是第二个线程尝试获取对象锁时,发现本来对象偏向的是线程一,那么偏向锁就会失效,加的就是轻量级锁
biasedLockFlag (1bit): 1 LockFlag (2bit): 01 biasedLockFlag (1bit): 1 LockFlag (2bit): 01 biasedLockFlag (1bit): 1 LockFlag (2bit): 01 biasedLockFlag (1bit): 1 LockFlag (2bit): 01 LockFlag (2bit): 00 biasedLockFlag (1bit): 0 LockFlag (2bit): 01
会使对象的锁变成重量级锁,因为wait/notify方法之后重量级锁才支持
批量重偏向
如果对象被多个线程访问,但是没有竞争,这时候偏向了线程一的对象又有机会重新偏向线程二,即可以不用升级为轻量级锁,可这和我们之前做的实验矛盾了呀,其实要实现重新偏向是要有条件的:就是超过20对象对同一个线程如线程一撤销偏向时,那么第20个及以后的对象才可以将撤销对线程一的偏向这个动作变为将第20个及以后的对象偏向线程二。Test21.java
4.6 wait和notify
建议先看看wait和notify方法的javadoc文档
4.6.1同步模式之保护性暂停
即 Guarded Suspension,用在一个线程等待另一个线程的执行结果,要点:
-
有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
-
如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
-
JDK 中,join 的实现、Future 的实现,采用的就是此模式
-
因为要等待另一方的结果,因此归类到同步模式
代码:Test22.java Test23.java这是带超时时间的
Test23.java中jiang\'dao\'de关于超时的增强,在join(long millis) 的源码中得到了体现:
public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } if (millis == 0) { while (isAlive()) { wait(0); } } else { // join一个指定的时间 while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
多任务版 GuardedObject图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理。和生产者消费者模式的区别就是:这个生产者和消费者之间是一一对应的关系,但是生产者消费者模式并不是。rpc框架的调用中就使用到了这种模式。 Test24.java
要点
-
与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
-
消费队列可以用来平衡生产和消费的线程资源
-
生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
-
消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
-
JDK 中各种阻塞队列,采用的就是这种模式
“异步”的意思就是生产者产生消息之后消息没有被立刻消费,而“同步模式”中,消息在产生之后被立刻消费了。
我们写一个线程间通信的消息队列,要注意区别,像rabbit mq等消息框架是进程间通信的。
于是老王单开了一间休息室(调用 wait 方法),让小南到休息室(WaitSet)等着去了,但这时锁释放开,
小南于是可以离开休息室,重新进入竞争锁的队列
final static Object obj = new Object(); public static void main(String[] args) { new Thread(() -> { synchronized (obj) { log.debug("执行...."); try { obj.wait(); // 让线程在obj上一直等待下去 } catch (InterruptedException e) { e.printStackTrace(); } log.debug("其它代码...."); } }).start(); new Thread(() -> { synchronized (obj) { log.debug("执行...."); try { obj.wait(); // 让线程在obj上一直等待下去 } catch (InterruptedException e) { e.printStackTrace(); } log.debug("其它代码...."); } }).start(); // 主线程两秒后执行 sleep(2); log.debug("唤醒 obj 上其它线程"); synchronized (obj) { obj.notify(); // 唤醒obj上一个线程 // obj.notifyAll(); // 唤醒obj上所有等待线程 } }
20:00:53.096 [Thread-0] c.TestWaitNotify - 执行.... 20:00:53.099 [Thread-1] c.TestWaitNotify - 执行.... 20:00:55.096 [main] c.TestWaitNotify - 唤醒 obj 上其它线程 20:00:55.096 [Thread-0] c.TestWaitNotify - 其它代码....
19:58:15.457 [Thread-0] c.TestWaitNotify - 执行.... 19:58:15.460 [Thread-1] c.TestWaitNotify - 执行.... 19:58:17.456 [main] c.TestWaitNotify - 唤醒 obj 上其它线程 19:58:17.456 [Thread-1] c.TestWaitNotify - 其它代码.... 19:58:17.456 [Thread-0] c.TestWaitNotify - 其它代码....
// 暂停当前线程 LockSupport.park(); // 恢复某个线程的运行 LockSupport.unpark(暂停线程对象)
Thread t1 = new Thread(() -> { log.debug("start..."); sleep(1); log.debug("park..."); LockSupport.park(); log.debug("resume..."); },"t1"); t1.start(); sleep(2); log.debug("unpark..."); LockSupport.unpark(t1);
18:42:52.585 c.TestParkUnpark [t1] - start... 18:42:53.589 c.TestParkUnpark [t1] - park... 18:42:54.583 c.TestParkUnpark [main] - unpark... 18:42:54.583 c.TestParkUnpark [t1] - resume...
Thread t1 = new Thread(() -> { log.debug("start..."); sleep(2); log.debug("park..."); LockSupport.park(); log.debug("resume..."); }, "t1"); t1.start(); sleep(1); log.debug("unpark..."); LockSupport.unpark(t1);
4.7.2 park unpark 原理
每个线程都有自己的一个 Parker 对象,由三部分组成 _counter, _cond和 _mutex
-
打个比喻线程就像一个旅人,Parker 就像他随身携带的背包,条件变量 _ cond就好比背包中的帐篷。_counter 就好比背包中的备用干粮(0 为耗尽,1 为充足)
-
调用 park 就是要看需不需要停下来歇息
-
如果备用干粮耗尽,那么钻进帐篷歇息
-
如果备用干粮充足,那么不需停留,继续前进
-
-
调用 unpark,就好比令干粮充足
-
如果这时线程还在帐篷,就唤醒让他继续前进
-
如果这时线程还在运行,那么下次他调用 park 时,仅是消耗掉备用干粮,不需停留继续前进
-
因为背包空间有限,多次调用 unpark 仅会补充一份备用干粮
-
-
可以不看例子,直接看实现过程
先调用park再调用upark的过程
1.先调用park
-
当前线程调用 Unsafe.park() 方法
-
检查 _counter ,本情况为 0,这时,获得 _mutex 互斥锁(mutex对象有个等待队列 _cond)
-
线程进入 _cond 条件变量阻塞
-
设置 _counter = 0
2.调用upark
-
调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
-
唤醒 _cond 条件变量中的 Thread_0
-
Thread_0 恢复运行
-
设置 _counter 为 0
先调用upark再调用park的过程
-
调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
-
当前线程调用 Unsafe.park() 方法
-
检查 _counter ,本情况为 1,这时线程无需阻塞,继续运行
-
设置 _counter 为 0
4.10 重新理解线程状态转换
假设有线程 Thread t
- 另外如果由于某个线程进入了死循环,导致其它线程一直等待,对于这种情况 linux 下可以通过 top 先定位到
- CPU 占用高的 Java 进程,再利用 top -Hp 进程id 来定位是哪个线程,最后再用 jstack 排查
有五位哲学家,围坐在圆桌旁。
顺序加锁的解决方案
- await 前需要获得锁
- await 执行后,会释放锁,进入 conditionObject 等待
- await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
- 竞争 lock 锁成功后,从 await 后继续执行
-
固定运行顺序,比如,必须先 2 后 1 打印
-
wait notify 版 Test35.java
-
Park Unpark 版 Test36.java
-
-
交替输出,线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出 abcabcabcabcabc 怎么实现
-
wait notify 版 Test37.java
-
Lock 条件变量版 Test38.java
-
Park Unpark 版 Test39.java
-
5. 共享模型之内存
5.1 Java 内存模型
5.2 可见性
CPU 缓存结构原理
1. CPU 缓存结构
⚡ root@yihang01 ~ lscpu Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian CPU(s): 1 On-line CPU(s) list: 0 Thread(s) per core: 1 Core(s) per socket: 1 Socket(s): 1 NUMA node(s): 1 Vendor ID: GenuineIntel CPU family: 6 Model: 142 Model name: Intel(R) Core(TM) i7-8565U CPU @ 1.80GHz Stepping: 11 CPU MHz: 1992.002 BogoMIPS: 3984.00 Hypervisor vendor: VMware Virtualization type: full L1d cache: 32K L1i cache: 32K L2 cache: 256K L3 cache: 8192K NUMA node0 CPU(s): 0
[高位组标记][低位索引][偏移量]
2. CPU 缓存读
- 0 去内存读取新数据更新缓存行
- 1 再对比高位组标记是否一致
- 一致,根据偏移量返回缓存数据
- 不一致,去内存读取新数据更新缓存行
3. CPU 缓存一致性
4. 内存屏障
- 写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中
- 而读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据
- 写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后
- 读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前
* 模式之 Balking
1. 定义
2. 实现
public class MonitorService { // 用来表示是否已经有线程已经在执行启动了 private volatile boolean starting; public void start() { log.info("尝试启动监控线程..."); synchronized (this) { if (starting) { return; } starting = true; } // 真正启动监控线程... } }
[http-nio-8080-exec-1] cn.itcast.monitor.service.MonitorService - 该监控线程已启动?(false) [http-nio-8080-exec-1] cn.itcast.monitor.service.MonitorService - 监控线程已启动... [http-nio-8080-exec-2] cn.itcast.monitor.service.MonitorService - 该监控线程已启动?(true) [http-nio-8080-exec-3] cn.itcast.monitor.service.MonitorService - 该监控线程已启动?(true) [http-nio-8080-exec-4] cn.itcast.monitor.service.MonitorService - 该监控线程已启动?(true)
public final class Singleton { private Singleton() { } private static Singleton INSTANCE = null; public static synchronized Singleton getInstance() { if (INSTANCE != null) { return INSTANCE; } INSTANCE = new Singleton(); return INSTANCE; } }
5.3 有序性
static int i; static int j; // 在某个线程内执行如下赋值操作 i = ...; j = ...;
i = ...;
j = ...;
j = ...;
i = ...;
原理之指令级并行
// 可以重排的例子 int a = 10; // 指令1 int b = 20; // 指令2 System.out.println( a + b ); // 不能重排的例子 int a = 10; // 指令1 int b = a - 5; // 指令2
5. SuperScalar 处理器