文章简介
前面我有文章介绍了synchronized的基本原理,这篇文章我会从jvm源码分析synchronized的实现逻辑,希望让大家有一个更加深度的认识
内容导航
-
从synchronized的字节码说起
-
什么是monitor
-
分析synchronized的源码
从synchronized的字节码说起
由于synchronized的实现是在jvm层面,所以我们如果要看它的源码,需要从字节码入手。这段代码演示了synchronized作为实例锁的两种用法,我们观察一下这段代码生成的字节码
-
public class App -
{ -
public synchronized void test1(){ -
} -
public void test2(){ -
synchronized (this){ -
-
} -
} -
public static void main( String[] args ){ -
System.out.println( "Hello World!" ); -
} -
}
进入classpath目录下找到App.class文件, 在cmd中输入 javap -v App.class查看字节码
-
public synchronized void test1(); -
descriptor: ()V -
flags: ACC_PUBLIC, ACC_SYNCHRONIZED -
Code: -
stack=0, locals=1, args_size=1 -
0: return -
LineNumberTable: -
line 10: 0 -
LocalVariableTable: -
Start Length Slot Name Signature -
0 1 0 this Lcom/gupaoedu/openclass/App; -
-
public void test2(); -
descriptor: ()V -
flags: ACC_PUBLIC -
Code: -
stack=2, locals=3, args_size=1 -
0: aload_0 -
1: dup -
2: astore_1 -
3: monitorenter //监视器进入,获取锁 -
4: aload_1 -
5: monitorexit //监视器退出,释放锁 -
6: goto 14 -
9: astore_2 -
10: aload_1 -
11: monitorexit -
12: aload_2 -
13: athrow -
14: return
通过字节码我们可以发现,修饰在方法层面的同步关键字,会多一个 ACC_SYNCHRONIZED的flag;修饰在代码块层面的同步块会多一个 monitorenter和 monitorexit关键字。无论采用哪一种方式,本质上都是对一个对象的监视器(monitor)进行获取,而这个获取的过程是排他的,也就是同一个时刻只能有一个线程获得同步块对象的监视器。 在 synchronized的原理分析这篇文章中,有提到对象监视器。
synchronized关键字经过编译之后,会在同步块的前后分别形成monitorenter和monitorexit这两个字节码指令。当我们的JVM把字节码加载到内存的时候,会对这两个指令进行解析。这两个字节码都需要一个Object类型的参数来指明要锁定和解锁的对象。如果Java程序中的synchronized明确指定了对象参数,那么这个对象就是加锁和解锁的对象;如果没有明确指定,那就根据synchronized修饰的是实例方法还是类方法,获取对应的对象实例或Class对象来作为锁对象
什么是monitor
在分析源代码之前需要了解oop, oopDesc, markOop等相关概念,在Synchronized的原理分析这篇文章中,我们讲到了synchronized的同步锁实际上是存储在对象头中,这个对象头是一个Java对象在内存中的布局的一部分。Java中的每一个Object在JVM内部都会有一个native的C++对象oop/oopDesc与之对应。在hotspot源码 oop.hpp中oopDesc的定义如下
-
class oopDesc { -
friend class VMStructs; -
private: -
volatile markOop _mark; -
union _metadata { -
Klass* _klass; -
narrowKlass _compressed_klass; -
} _metadata;
其中 markOop就是我们所说的Mark Word,用于存储锁的标识。 hotspot源码 markOop.hpp文件代码片段
-
class markOopDesc: public oopDesc { -
private: -
// Conversion -
uintptr_t value() const { return (uintptr_t) this; } -
-
public: -
// Constants -
enum { age_bits = 4, -
lock_bits = 2, -
biased_lock_bits = 1, -
max_hash_bits = BitsPerWord - age_bits - lock_bits - biased_lock_bits, -
hash_bits = max_hash_bits > 31 ? 31 : max_hash_bits, -
cms_bits = LP64_ONLY(1) NOT_LP64(0), -
epoch_bits = 2 -
}; -
... -
}
markOopDesc继承自oopDesc,并且扩展了自己的monitor方法,这个方法返回一个ObjectMonitor指针对象,在hotspot虚拟机中,采用ObjectMonitor类来实现monitor
-
bool has_monitor() const { -
return ((value() & monitor_value) != 0); -
} -
ObjectMonitor* monitor() const { -
assert(has_monitor(), "check"); -
// Use xor instead of &~ to provide one extra tag-bit check. -
return (ObjectMonitor*) (value() ^ monitor_value); -
}
在 ObjectMonitor.hpp中,可以看到ObjectMonitor的定义
-
class ObjectMonitor { -
... -
ObjectMonitor() { -
_header = NULL; //markOop对象头 -
_count = 0; -
_waiters = 0, //等待线程数 -
_recursions = 0; //重入次数 -
_object = NULL; -
_owner = NULL; //获得ObjectMonitor对象的线程 -
_WaitSet = NULL; //处于wait状态的线程,会被加入到waitSet -
_WaitSetLock = 0 ; -
_Responsible = NULL ; -
_succ = NULL ; -
_cxq = NULL ; -
FreeNext = NULL ; -
_EntryList = NULL ; //处于等待锁BLOCKED状态的线程 -
_SpinFreq = 0 ; -
_SpinClock = 0 ; -
OwnerIsThread = 0 ; -
_previous_owner_tid = 0; //监视器前一个拥有线程的ID -
} -
...
简单总结一下,同步块的实现使用
monitorenter和monitorexit指令,而同步方法是依靠方法修饰符上的flagACC_SYNCHRONIZED来完成。其本质是对一个对象监视器(monitor)进行获取,这个获取过程是排他的,也就是同一个时刻只能有一个线程获得由synchronized所保护对象的监视器。所谓的监视器,实际上可以理解为一个同步工具,它是由Java对象进行描述的。在Hotspot中,是通过ObjectMonitor来实现,每个对象中都会内置一个ObjectMonitor对象
简单分析synchronized的源码
从 monitorenter和 monitorexit这两个指令来开始阅读源码,JVM将字节码加载到内存以后,会对这两个指令进行解释执行, monitorenter, monitorexit的指令解析是通过 InterpreterRuntime.cpp中的两个方法实现
-
InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem) -
InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem) -
//JavaThread 当前获取锁的线程 -
//BasicObjectLock 基础对象锁
我们基于monitorenter为入口,沿着偏向锁->轻量级锁->重量级锁的路径来分析synchronized的实现过程
-
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem)) -
#ifdef ASSERT -
thread->last_frame().interpreter_frame_verify_monitor(elem); -
#endif -
... -
if (UseBiasedLocking) { -
// Retry fast entry if bias is revoked to avoid unnecessary inflation -
ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK); -
} else { -
ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK); -
} -
... -
#ifdef ASSERT -
thread->last_frame().interpreter_frame_verify_monitor(elem); -
#endif -
IRT_END
UseBiasedLocking是在JVM启动的时候,是否启动偏向锁的标识
-
如果支持偏向锁,则执行
ObjectSynchronizer::fast_enter的逻辑 -
如果不支持偏向锁,则执行
ObjectSynchronizer::slow_enter逻辑,绕过偏向锁,直接进入轻量级锁
ObjectSynchronizer::fast_enter的实现在 synchronizer.cpp文件中,代码如下
-
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) { -
if (UseBiasedLocking) { //判断是否开启了偏向锁 -
if (!SafepointSynchronize::is_at_safepoint()) { //如果不处于全局安全点 -
//通过`revoke_and_rebias`这个函数尝试获取偏向锁 -
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD); -
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {//如果是撤销与重偏向直接返回 -
return; -
} -
} else {//如果在安全点,撤销偏向锁 -
assert(!attempt_rebias, "can not rebias toward VM thread"); -
BiasedLocking::revoke_at_safepoint(obj); -
} -
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now"); -
} -
-
slow_enter (obj, lock, THREAD) ; -
}
fast_enter方法的主要流程做一个简单的解释
-
再次检查偏向锁是否开启
-
当处于不安全点时,通过
revoke_and_rebias尝试获取偏向锁,如果成功则直接返回,如果失败则进入轻量级锁获取过程 -
revoke_and_rebias这个偏向锁的获取逻辑在biasedLocking.cpp中 -
如果偏向锁未开启,则进入
slow_enter获取轻量级锁的流程
偏向锁的获取逻辑
BiasedLocking::revoke_and_rebias 是用来获取当前偏向锁的状态(可能是偏向锁撤销后重新偏向)。这个方法的逻辑在 biasedLocking.cpp中
-
BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) { -
assert(!SafepointSynchronize::is_at_safepoint(), "must not be called while at safepoint"); -
markOop mark = obj->mark(); //获取锁对象的对象头 -
//判断mark是否为可偏向状态,即mark的偏向锁标志位为1,锁标志位为 01,线程id为null -
if (mark->is_biased_anonymously() && !attempt_rebias) { -
//这个分支是进行对象的hashCode计算时会进入,在一个非全局安全点进行偏向锁撤销 -
markOop biased_value = mark; -
//创建一个非偏向的markword -
markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age()); -
//Atomic:cmpxchg_ptr是CAS操作,通过cas重新设置偏向锁状态 -
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark); -
if (res_mark == biased_value) {//如果CAS成功,返回偏向锁撤销状态 -
return BIAS_REVOKED; -
} -
} else if (mark->has_bias_pattern()) {//如果锁对象为可偏向状态(biased_lock:1, lock:01,不管线程id是否为空),尝试重新偏向 -
Klass* k = obj->klass(); -
markOop prototype_header = k->prototype_header(); -
//如果已经有线程对锁对象进行了全局锁定,则取消偏向锁操作 -
if (!prototype_header->has_bias_pattern()) { -
markOop biased_value = mark; -
//CAS 更新对象头markword为非偏向锁 -
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark); -
assert(!(*(obj->mark_addr()))->has_bias_pattern(), "even if we raced, should still be revoked"); -
return BIAS_REVOKED; //返回偏向锁撤销状态 -
} else if (prototype_header->bias_epoch() != mark->bias_epoch()) { -
//如果偏向锁过期,则进入当前分支 -
if (attempt_rebias) {//如果允许尝试获取偏向锁 -
assert(THREAD->is_Java_thread(), ""); -
markOop biased_value = mark; -
markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch()); -
//通过CAS 操作, 将本线程的 ThreadID 、时间错、分代年龄尝试写入对象头中 -
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(rebiased_prototype, obj->mark_addr(), mark); -
if (res_mark == biased_value) { //CAS成功,则返回撤销和重新偏向状态 -
return BIAS_REVOKED_AND_REBIASED; -
} -
} else {//不尝试获取偏向锁,则取消偏向锁 -
//通过CAS操作更新分代年龄 -
markOop biased_value = mark; -
markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age()); -
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark); -
if (res_mark == biased_value) { //如果CAS操作成功,返回偏向锁撤销状态 -
return BIAS_REVOKED; -
} -
} -
} -
} -
...//省略 -
}
偏向锁的撤销
当到达一个全局安全点时,这时会根据偏向锁的状态来判断是否需要撤销偏向锁,调用 revoke_at_safepoint方法,这个方法也是在 biasedLocking.cpp中定义的
-
void BiasedLocking::revoke_at_safepoint(Handle h_obj) { -
assert(SafepointSynchronize::is_at_safepoint(), "must only be called while at safepoint"); -
oop obj = h_obj(); -
//更新撤销偏向锁计数,并返回偏向锁撤销次数和偏向次数 -
HeuristicsResult heuristics = update_heuristics(obj, false); -
if (heuristics == HR_SINGLE_REVOKE) {//可偏向且未达到批量处理的阈值(下面会单独解释) -
revoke_bias(obj, false, false, NULL); //撤销偏向锁 -
} else if ((heuristics == HR_BULK_REBIAS) || -
(heuristics == HR_BULK_REVOKE)) {//如果是多次撤销或者多次偏向 -
//批量撤销 -
bulk_revoke_or_rebias_at_safepoint(obj, (heuristics == HR_BULK_REBIAS), false, NULL); -
} -
clean_up_cached_monitor_info(); -
}
偏向锁的释放,需要等待全局安全点(在这个时间点上没有正在执行的字节码),首先暂停拥有偏向锁的线程,然后检查持有偏向锁的线程是否还活着,如果线程不处于活动状态,则将对象头设置成无锁状态。如果线程仍然活着,则会升级为轻量级锁,遍历偏向对象的所记录。栈帧中的锁记录和对象头的Mark Word要么重新偏向其他线程,要么恢复到无锁,或者标记对象不适合作为偏向锁。最后唤醒暂停的线程。
JVM内部为每个类维护了一个偏向锁revoke计数器,对偏向锁撤销进行计数,当这个值达到指定阈值时,JVM会认为这个类的偏向锁有问题,需要重新偏向(rebias),对所有属于这个类的对象进行重偏向的操作成为
批量重偏向(bulk rebias)。在做bulk rebias时,会对这个类的epoch的值做递增,这个epoch会存储在对象头中的epoch字段。在判断这个对象是否获得偏向锁的条件是:markword的biased_lock:1、lock:01、threadid和当前线程id相等、epoch字段和所属类的epoch值相同,如果epoch的值不一样,要么就是撤销偏向锁、要么就是rebias; 如果这个类的revoke计数器的值继续增加到一个阈值,那么jvm会认为这个类不适合偏向锁,就需要进行bulk revoke操作
轻量级锁的获取逻辑
轻量级锁的获取,是调用 ::slow_enter方法,该方法同样位于 synchronizer.cpp文件中
-
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) { -
markOop mark = obj->mark(); -
assert(!mark->has_bias_pattern(), "should not see bias pattern here"); -
-
if (mark->is_neutral()) { //如果当前是无锁状态, markword的biase_lock:0,lock:01 -
//直接把mark保存到BasicLock对象的_displaced_header字段 -
lock->set_displaced_header(mark); -
//通过CAS将mark word更新为指向BasicLock对象的指针,更新成功表示获得了轻量级锁 -
if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) { -
TEVENT (slow_enter: release stacklock) ; -
return ; -
} -
// Fall through to inflate() ... -
} -
//如果markword处于加锁状态、且markword中的ptr指针指向当前线程的栈帧,表示为重入操作,不需要争抢锁 -
else if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) { -
assert(lock != mark->locker(), "must not re-lock the same lock"); -
assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock"); -
lock->set_displaced_header(NULL); -
return; -
} -
-
#if 0 -
// The following optimization isn't particularly useful. -
if (mark->has_monitor() && mark->monitor()->is_entered(THREAD)) { -
lock->set_displaced_header (NULL) ; -
return ; -
} -
#endif -
//代码执行到这里,说明有多个线程竞争轻量级锁,轻量级锁通过`inflate`进行膨胀升级为重量级锁 -
lock->set_displaced_header(markOopDesc::unused_mark()); -
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD); -
}
轻量级锁的获取逻辑简单再整理一下
-
mark->is_neutral()方法,is_neutral这个方法是在markOop.hpp中定义,如果biased_lock:0且lock:01表示无锁状态 -
如果mark处于无锁状态,则进入步骤(3),否则执行步骤(5)
-
把mark保存到BasicLock对象的displacedheader字段
-
通过CAS尝试将markword更新为指向BasicLock对象的指针,如果更新成功,表示竞争到锁,则执行同步代码,否则执行步骤(5)
-
如果当前mark处于加锁状态,且mark中的ptr指针指向当前线程的栈帧,则执行同步代码,否则说明有多个线程竞争轻量级锁,轻量级锁需要膨胀升级为重量级锁
轻量级锁的释放逻辑
轻量级锁的释放是通过 monitorexit调用
-
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem)) -
#ifdef ASSERT -
thread->last_frame().interpreter_frame_verify_monitor(elem); -
#endif -
Handle h_obj(thread, elem->obj()); -
assert(Universe::heap()->is_in_reserved_or_null(h_obj()), -
"must be NULL or an object"); -
if (elem == NULL || h_obj()->is_unlocked()) { -
THROW(vmSymbols::java_lang_IllegalMonitorStateException()); -
} -
ObjectSynchronizer::slow_exit(h_obj(), elem->lock(), thread); -
// Free entry. This must be done here, since a pending exception might be installed on -
// exit. If it is not cleared, the exception handling code will try to unlock the monitor again. -
elem->set_obj(NULL); -
#ifdef ASSERT -
thread->last_frame().interpreter_frame_verify_monitor(elem); -
#endif -
IRT_END
这段代码中主要是通过 ObjectSynchronizer::slow_exit来执行
-
void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) { -
fast_exit (object, lock, THREAD) ; -
}
ObjectSynchronizer::fast_exit的代码如下
-
void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) { -
assert(!object->mark()->has_bias_pattern(), "should not see bias pattern here"); -
// if displaced header is null, the previous enter is recursive enter, no-op -
markOop dhw = lock->displaced_header(); //获取锁对象中的对象头 -
markOop mark ; -
if (dhw == NULL) { -
// Recursive stack-lock. -
// Diagnostics -- Could be: stack-locked, inflating, inflated. -
mark = object->mark() ; -
assert (!mark->is_neutral(), "invariant") ; -
if (mark->has_locker() && mark != markOopDesc::INFLATING()) { -
assert(THREAD->is_lock_owned((address)mark->locker()), "invariant") ; -
} -
if (mark->has_monitor()) { -
ObjectMonitor * m = mark->monitor() ; -
assert(((oop)(m->object()))->mark() == mark, "invariant") ; -
assert(m->is_entered(THREAD), "invariant") ; -
} -
return ; -
} -
-
mark = object->mark() ; //获取线程栈帧中锁记录(LockRecord)中的markword -
-
// If the object is stack-locked by the current thread, try to -
// swing the displaced header from the box back to the mark. -
if (mark == (markOop) lock) { -
assert (dhw->is_neutral(), "invariant") ; -
//通过CAS尝试将Displaced Mark Word替换回对象头,如果成功,表示锁释放成功。 -
if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) { -
TEVENT (fast_exit: release stacklock) ; -
return; -
} -
} -
//锁膨胀,调用重量级锁的释放锁方法 -
ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ; -
}
轻量级锁的释放也比较简单,就是将当前线程栈帧中锁记录空间中的Mark Word替换到锁对象的对象头中,如果成功表示锁释放成功。否则,锁膨胀成重量级锁,实现重量级锁的释放锁逻辑
锁膨胀的过程分析
重量级锁是通过对象内部的监视器(monitor)来实现,而monitor的本质是依赖操作系统底层的MutexLock实现的。我们先来看锁的膨胀过程,从前面的分析中已经知道了所膨胀的过程是通过 ObjectSynchronizer::inflate方法实现的,代码如下
-
ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) { -
// Inflate mutates the heap ... -
// Relaxing assertion for bug 6320749. -
assert (Universe::verify_in_progress() || -
!SafepointSynchronize::is_at_safepoint(), "invariant") ; -
-
for (;;) { //通过无意义的循环实现自旋操作 -
const markOop mark = object->mark() ; -
assert (!mark->has_bias_pattern(), "invariant") ; -
-
if (mark->has_monitor()) {//has_monitor是markOop.hpp中的方法,如果为true表示当前锁已经是重量级锁了 -
ObjectMonitor * inf = mark->monitor() ;//获得重量级锁的对象监视器直接返回 -
assert (inf->header()->is_neutral(), "invariant"); -
assert (inf->object() == object, "invariant") ; -
assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is invalid"); -
return inf ; -
} -
-
if (mark == markOopDesc::INFLATING()) {//膨胀等待,表示存在线程正在膨胀,通过continue进行下一轮的膨胀 -
TEVENT (Inflate: spin while INFLATING) ; -
ReadStableMark(object) ; -
continue ; -
} -
-
if (mark->has_locker()) {//表示当前锁为轻量级锁,以下是轻量级锁的膨胀逻辑 -
ObjectMonitor * m = omAlloc (Self) ;//获取一个可用的ObjectMonitor -
// Optimistically prepare the objectmonitor - anticipate successful CAS -
// We do this before the CAS in order to minimize the length of time -
// in which INFLATING appears in the mark. -
m->Recycle(); -
m->_Responsible = NULL ; -
m->OwnerIsThread = 0 ; -
m->_recursions = 0 ; -
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // Consider: maintain by type/class -
/**将object->mark_addr()和mark比较,如果这两个值相等,则将object->mark_addr() -
改成markOopDesc::INFLATING(),相等返回是mark,不相等返回的是object->mark_addr()**/ -
markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ; -
if (cmp != mark) {//CAS失败 -
omRelease (Self, m, true) ;//释放监视器 -
continue ; // 重试 -
} -
-
markOop dmw = mark->displaced_mark_helper() ; -
assert (dmw->is_neutral(), "invariant") ; -
-
//CAS成功以后,设置ObjectMonitor相关属性 -
m->set_header(dmw) ; -
-
-
m->set_owner(mark->locker()); -
m->set_object(object); -
// TODO-FIXME: assert BasicLock->dhw != 0. -
-
-
guarantee (object->mark() == markOopDesc::INFLATING(), "invariant") ; -
object->release_set_mark(markOopDesc::encode(m)); -
-
-
if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ; -
TEVENT(Inflate: overwrite stacklock) ; -
if (TraceMonitorInflation) { -
if (object->is_instance()) { -
ResourceMark rm; -
tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s", -
(void *) object, (intptr_t) object->mark(), -
object->klass()->external_name()); -
} -
} -
return m ; //返回ObjectMonitor -
} -
//如果是无锁状态 -
assert (mark->is_neutral(), "invariant"); -
ObjectMonitor * m = omAlloc (Self) ; ////获取一个可用的ObjectMonitor -
//设置ObjectMonitor相关属性 -
m->Recycle(); -
m->set_header(mark); -
m->set_owner(NULL); -
m->set_object(object); -
m->OwnerIsThread = 1 ; -
m->_recursions = 0 ; -
m->_Responsible = NULL ; -
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // consider: keep metastats by type/class -
/**将object->mark_addr()和mark比较,如果这两个值相等,则将object->mark_addr() -
改成markOopDesc::encode(m),相等返回是mark,不相等返回的是object->mark_addr()**/ -
if (Atomic::cmpxchg_ptr (markOopDesc::encode(m), object->mark_addr(), mark) != mark) { -
//CAS失败,说明出现了锁竞争,则释放监视器重行竞争锁 -
m->set_object (NULL) ; -
m->set_owner (NULL) ; -
m->OwnerIsThread = 0 ; -
m->Recycle() ; -
omRelease (Self, m, true) ; -
m = NULL ; -
continue ; -
// interference - the markword changed - just retry. -
// The state-transitions are one-way, so there's no chance of -
// live-lock -- "Inflated" is an absorbing state. -
} -
-
if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ; -
TEVENT(Inflate: overwrite neutral) ; -
if (TraceMonitorInflation) { -
if (object->is_instance()) { -
ResourceMark rm; -
tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s", -
(void *) object, (intptr_t) object->mark(), -
object->klass()->external_name()); -
} -
} -
return m ; //返回ObjectMonitor对象 -
} -
}
锁膨胀的过程稍微有点复杂,整个锁膨胀的过程是通过自旋来完成的,具体的实现逻辑简答总结以下几点
-
1.
mark->has_monitor()判断如果当前锁对象为重量级锁,也就是lock:10,则执行(2),否则执行(3) -
2.通过
mark->monitor获得重量级锁的对象监视器ObjectMonitor并返回,锁膨胀过程结束 -
3.如果当前锁处于
INFLATING,说明有其他线程在执行锁膨胀,那么当前线程通过自旋等待其他线程锁膨胀完成 -
4.如果当前是轻量级锁状态
mark->has_locker(),则进行锁膨胀。首先,通过omAlloc方法获得一个可用的ObjectMonitor,并设置初始数据;然后通过CAS将对象头设置为`markOopDesc:INFLATING,表示当前锁正在膨胀,如果CAS失败,继续自旋 -
5.如果是无锁状态,逻辑类似第4步骤
锁膨胀的过程实际上是获得一个ObjectMonitor对象监视器,而真正抢占锁的逻辑,在
ObjectMonitor::enter方法里面
重量级锁的竞争逻辑
重量级锁的竞争,在 ObjectMonitor::enter方法中,代码文件在 objectMonitor.cpp重量级锁的代码就不一一分析了,简单说一下下面这段代码主要做的几件事
-
通过CAS将monitor的
_owner字段设置为当前线程,如果设置成功,则直接返回 -
如果之前的
_owner指向的是当前的线程,说明是重入,执行_recursions++增加重入次数 -
如果当前线程获取监视器锁成功,将
_recursions设置为1,_owner设置为当前线程 -
如果获取锁失败,则等待锁释放
-
void ATTR ObjectMonitor::enter(TRAPS) { -
// The following code is ordered to check the most common cases first -
// and to reduce RTS->RTO cache line upgrades on SPARC and IA32 processors. -
Thread * const Self = THREAD ; -
void * cur ; -
-
cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ; -
if (cur == NULL) {//CAS成功 -
// Either ASSERT _recursions == 0 or explicitly set _recursions = 0. -
assert (_recursions == 0 , "invariant") ; -
assert (_owner == Self, "invariant") ; -
// CONSIDER: set or assert OwnerIsThread == 1 -
return ; -
} -
-
if (cur == Self) { -
// TODO-FIXME: check for integer overflow! BUGID 6557169. -
_recursions ++ ; -
return ; -
} -
-
if (Self->is_lock_owned ((address)cur)) { -
assert (_recursions == 0, "internal state error"); -
_recursions = 1 ; -
// Commute owner from a thread-specific on-stack BasicLockObject address to -
// a full-fledged "Thread *". -
_owner = Self ; -
OwnerIsThread = 1 ; -
return ; -
} -
-
// We've encountered genuine contention. -
assert (Self->_Stalled == 0, "invariant") ; -
Self->_Stalled = intptr_t(this) ; -
-
// Try one round of spinning *before* enqueueing Self -
// and before going through the awkward and expensive state -
// transitions. The following spin is strictly optional ... -
// Note that if we acquire the monitor from an initial spin -
// we forgo posting JVMTI events and firing DTRACE probes. -
if (Knob_SpinEarly && TrySpin (Self) > 0) { -
assert (_owner == Self , "invariant") ; -
assert (_recursions == 0 , "invariant") ; -
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ; -
Self->_Stalled = 0 ; -
return ; -
} -
-
assert (_owner != Self , "invariant") ; -
assert (_succ != Self , "invariant") ; -
assert (Self->is_Java_thread() , "invariant") ; -
JavaThread * jt = (JavaThread *) Self ; -
assert (!SafepointSynchronize::is_at_safepoint(), "invariant") ; -
assert (jt->thread_state() != _thread_blocked , "invariant") ; -
assert (this->object() != NULL , "invariant") ; -
assert (_count >= 0, "invariant") ; -
-
// Prevent deflation at STW-time. See deflate_idle_monitors() and is_busy(). -
// Ensure the object-monitor relationship remains stable while there's contention. -
Atomic::inc_ptr(&_count); -
-
EventJavaMonitorEnter event; -
-
{ // Change java thread status to indicate blocked on monitor enter. -
JavaThreadBlockedOnMonitorEnterState jtbmes(jt, this); -
-
DTRACE_MONITOR_PROBE(contended__enter, this, object(), jt); -
if (JvmtiExport::should_post_monitor_contended_enter()) { -
JvmtiExport::post_monitor_contended_enter(jt, this); -
} -
-
OSThreadContendState osts(Self->osthread()); -
ThreadBlockInVM tbivm(jt); -
-
Self->set_current_pending_monitor(this); -
-
// TODO-FIXME: change the following for(;;) loop to straight-line code. -
for (;;) { -
jt->set_suspend_equivalent(); -
// cleared by handle_special_suspend_equivalent_condition() -
// or java_suspend_self() -
-
EnterI (THREAD) ; -
-
if (!ExitSuspendEquivalent(jt)) break ; -
-
// -
// We have acquired the contended monitor, but while we were -
// waiting another thread suspended us. We don't want to enter -
// the monitor while suspended because that would surprise the -
// thread that suspended us. -
// -
_recursions = 0 ; -
_succ = NULL ; -
exit (false, Self) ; -
-
jt->java_suspend_self(); -
} -
Self->set_current_pending_monitor(NULL); -
} -
...//此处省略无数行代码
如果获取锁失败,则需要通过自旋的方式等待锁释放,自旋执行的方法是 ObjectMonitor::EnterI,部分代码如下
-
将当前线程封装成ObjectWaiter对象node,状态设置成TS_CXQ
-
通过自旋操作将node节点push到_cxq队列
-
node节点添加到_cxq队列之后,继续通过自旋尝试获取锁,如果在指定的阈值范围内没有获得锁,则通过park将当前线程挂起,等待被唤醒
-
void ATTR ObjectMonitor::EnterI (TRAPS) { -
Thread * Self = THREAD ; -
...//省略很多代码 -
ObjectWaiter node(Self) ; -
Self->_ParkEvent->reset() ; -
node._prev = (ObjectWaiter *) 0xBAD ; -
node.TState = ObjectWaiter::TS_CXQ ; -
-
// Push "Self" onto the front of the _cxq. -
// Once on cxq/EntryList, Self stays on-queue until it acquires the lock. -
// Note that spinning tends to reduce the rate at which threads -
// enqueue and dequeue on EntryList|cxq. -
ObjectWaiter * nxt ; -
for (;;) { //自旋,讲node添加到_cxq队列 -
node._next = nxt = _cxq ; -
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ; -
-
// Interference - the CAS failed because _cxq changed. Just retry. -
// As an optional optimization we retry the lock. -
if (TryLock (Self) > 0) { -
assert (_succ != Self , "invariant") ; -
assert (_owner == Self , "invariant") ; -
assert (_Responsible != Self , "invariant") ; -
return ; -
} -
} -
...//省略很多代码 -
//node节点添加到_cxq队列之后,继续通过自旋尝试获取锁,如果在指定的阈值范围内没有获得锁,则通过park将当前线程挂起,等待被唤醒 -
for (;;) { -
if (TryLock (Self) > 0) break ; -
assert (_owner != Self, "invariant") ; -
-
if ((SyncFlags & 2) && _Responsible == NULL) { -
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ; -
} -
-
// park self //通过park挂起当前线程 -
if (_Responsible == Self || (SyncFlags & 1)) { -
TEVENT (Inflated enter - park TIMED) ; -
Self->_ParkEvent->park ((jlong) RecheckInterval) ; -
// Increase the RecheckInterval, but clamp the value. -
RecheckInterval *= 8 ; -
if (RecheckInterval > 1000) RecheckInterval = 1000 ; -
} else { -
TEVENT (Inflated enter - park UNTIMED) ; -
Self->_ParkEvent->park() ;//当前线程挂起 -
} -
-
if (TryLock(Self) > 0) break ; //当线程被唤醒时,会从这里继续执行 -
-
-
TEVENT (Inflated enter - Futile wakeup) ; -
if (ObjectMonitor::_sync_FutileWakeups != NULL) { -
ObjectMonitor::_sync_FutileWakeups->inc() ; -
} -
++ nWakeups ; -
-
if ((Knob_SpinAfterFutile & 1) && TrySpin (Self) > 0) break ; -
-
if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) { -
Self->_ParkEvent->reset() ; -
OrderAccess::fence() ; -
} -
if (_succ == Self) _succ = NULL ; -
-
-
OrderAccess::fence() ; -
} -
...//省略很多代码 -
}
TryLock(self)的代码是在 ObjectMonitor::TryLock定义的,代码的实现如下
代码的实现原理很简单,通过自旋,CAS设置monitor的_owner字段为当前线程,如果成功,表示获取到了锁,如果失败,则继续被挂起
-
int ObjectMonitor::TryLock (Thread * Self) { -
for (;;) { -
void * own = _owner ; -
if (own != NULL) return 0 ; -
if (Atomic::cmpxchg_ptr (Self, &_owner, NULL) == NULL) { -
// Either guarantee _recursions == 0 or set _recursions = 0. -
assert (_recursions == 0, "invariant") ; -
assert (_owner == Self, "invariant") ; -
// CONSIDER: set or assert that OwnerIsThread == 1 -
return 1 ; -
} -
// The lock had been free momentarily, but we lost the race to the lock. -
// Interference -- the CAS failed. -
// We can either return -1 or retry. -
// Retry doesn't make as much sense because the lock was just acquired. -
if (true) return -1 ; -
} -
}
重量级锁的释放
重量级锁的释放是通过 ObjectMonitor::exit来实现的,释放以后会通知被阻塞的线程去竞争锁
-
判断当前锁对象中的owner没有指向当前线程,如果owner指向的BasicLock在当前线程栈上,那么将_owner指向当前线程
-
如果当前锁对象中的_owner指向当前线程,则判断当前线程重入锁的次数,如果不为0,继续执行ObjectMonitor::exit(),直到重入锁次数为0为止
-
释放当前锁,并根据QMode的模式判断,是否将_cxq中挂起的线程唤醒。还是其他操作
-
void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) { -
Thread * Self = THREAD ; -
if (THREAD != _owner) {//如果当前锁对象中的_owner没有指向当前线程 -
//如果_owner指向的BasicLock在当前线程栈上,那么将_owner指向当前线程 -
if (THREAD->is_lock_owned((address) _owner)) { -
// Transmute _owner from a BasicLock pointer to a Thread address. -
// We don't need to hold _mutex for this transition. -
// Non-null to Non-null is safe as long as all readers can -
// tolerate either flavor. -
assert (_recursions == 0, "invariant") ; -
_owner = THREAD ; -
_recursions = 0 ; -
OwnerIsThread = 1 ; -
} else { -
// NOTE: we need to handle unbalanced monitor enter/exit -
// in native code by throwing an exception. -
// TODO: Throw an IllegalMonitorStateException ? -
TEVENT (Exit - Throw IMSX) ; -
assert(false, "Non-balanced monitor enter/exit!"); -
if (false) { -
THROW(vmSymbols::java_lang_IllegalMonitorStateException()); -
} -
return; -
} -
} -
//如果当前,线程重入锁的次数,不为0,那么就重新走ObjectMonitor::exit,直到重入锁次数为0为止 -
if (_recursions != 0) { -
_recursions--; // this is simple recursive enter -
TEVENT (Inflated exit - recursive) ; -
return ; -
} -
...//此处省略很多代码 -
for (;;) { -
if (Knob_ExitPolicy == 0) { -
OrderAccess::release_store(&_owner, (void*)NULL); //释放锁 -
OrderAccess::storeload(); // See if we need to wake a successor -
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) { -
TEVENT(Inflated exit - simple egress); -
return; -
} -
TEVENT(Inflated exit - complex egress); -
//省略部分代码... -
} -
//省略部分代码... -
ObjectWaiter * w = NULL; -
int QMode = Knob_QMode; -
//根据QMode的模式判断, -
//如果QMode == 2则直接从_cxq挂起的线程中唤醒 -
if (QMode == 2 && _cxq != NULL) { -
w = _cxq; -
ExitEpilog(Self, w); -
return; -
} -
//省略部分代码... 省略的代码为根据QMode的不同,不同的唤醒机制 -
} -
}
根据不同的策略(由QMode指定),从cxq或EntryList中获取头节点,通过ObjectMonitor::ExitEpilog方法唤醒该节点封装的线程,唤醒操作最终由unpark完成
-
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) { -
assert (_owner == Self, "invariant") ; -
-
// Exit protocol: -
// 1. ST _succ = wakee -
// 2. membar #loadstore|#storestore; -
// 2. ST _owner = NULL -
// 3. unpark(wakee) -
-
_succ = Knob_SuccEnabled ? Wakee->_thread : NULL ; -
ParkEvent * Trigger = Wakee->_event ; -
-
// Hygiene -- once we've set _owner = NULL we can't safely dereference Wakee again. -
// The thread associated with Wakee may have grabbed the lock and "Wakee" may be -
// out-of-scope (non-extant). -
Wakee = NULL ; -
-
// Drop the lock -
OrderAccess::release_store_ptr (&_owner, NULL) ; -
OrderAccess::fence() ; // ST _owner vs LD in unpark() -
-
if (SafepointSynchronize::do_call_back()) { -
TEVENT (unpark before SAFEPOINT) ; -
} -
-
DTRACE_MONITOR_PROBE(contended__exit, this, object(), Self); -
Trigger->unpark() ; //unpark唤醒线程 -
-
// Maintain stats and report events to JVMTI -
if (ObjectMonitor::_sync_Parks != NULL) { -
ObjectMonitor::_sync_Parks->inc() ; -
} -
}
分析源码,需要很大的耐心,希望大家能有耐心看下去。