ArrayBlockingQueue:数组实现的有界阻塞队列,按照FIFO的原则对元素进行排序
LinkedBlockingQueue:链表实现的有界阻塞队列,此队列的默认和最大长度为Integer.MAX_VALUE,按照FIFO原则进行排序
PriorityBlockingQueue:优先级排序的无界阻塞队列,默认情况下采用自然顺序升序排列,也可以自定义compareTo()方法指定元素排序规则
DelayQueue:优先级队列实现的无界阻塞队列
SynchronousQueue:不存储元素的阻塞队列,每一个put操作必须等待一个take操作,否则不能继续添加元素
LinkedTransferQueue:链表实现的无界阻塞队列
LinkedBlockingDeque:链表实现的双向阻塞队列
ArrayBlockingQueue队列分析:
创建对象时候new ArrayBlockingQueue<>(100)
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
//初始化一个指定大小容量的数组
this.items = new Object[capacity];
//重入锁,出队和入队持有这把锁,创建一个非公平锁
lock = new ReentrantLock(fair);
//创建一个非空的等待队列condition
notEmpty = lock.newCondition();
//创建一个非满的等待队列condition
notFull = lock.newCondition();
}
添加元素:
add():
//本质上调用offer方法,如果失败抛出异常
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
offer():
//传入对象判空,获取重入锁,如果队列满了返回false,否则添加到队列中
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
//当putIndex等于数组长度时候重置为0
if (++putIndex == items.length)
putIndex = 0;
count++;
//唤醒处于等待状态下的线程,表示当前队列中的元素不为空,如果存在
//消费者线程阻塞,可以开始获取元素
notEmpty.signal();
}
put()方法:
// 和offer方法基本项目,不同的是在队列满了之后,会阻塞
public void put(E e) throws InterruptedException {
checkNotNull(e);
//优先允许在等待时候由其他线程调用等待线程的interrupt方法来中断等待直接返回
//lock方法是尝试获取锁成功后才响应中断
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//队列满了的情况下,当前线程会被阻塞挂起到等待队列中
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
获取元素:
take()方法:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果队列为空,则直接阻塞
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
//出队列
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//默认获取0位置的元素
E x = (E) items[takeIndex];
items[takeIndex] = null;
//如果拿到数组的最大值,那么重置为0,继续从头部获取数据
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//唤醒因为队列满了而导致被阻塞的线程继续添加数据
notFull.signal();
return x;
}
remove()方法
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果队列不为空
if (count > 0) {
//获取下一个要添加元素时的索引
final int putIndex = this.putIndex;
//获取要移除的元素的索引
int i = takeIndex;
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
poll()方法:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//判断当前队列是否为空,如果为空返回null,否则出队列
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
原子类操作:
原子更新基本类型:AtomicBoolean AtomicInteger AtomicLong
原子更新数组:AtomicIntegerArray AtomicLongArray AtomicReferenceArray
原子更新引用:AtomicReference AtomicReferenceFieldUpdater
原子更新字段:AtomicIntegerFieldUpdater AtomicLongFieldUpdater
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
static {
try {
//获取当前value这个变量在内存中的偏移量,后续会基于这个偏移量从内存中获取到value值和当前值来进行比较
//实现乐观锁
valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
//通过死循环,CAS乐观锁来做原子递增
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
ThreadLocal分析:
调用ThreadLocal的set()方法,如果是首次调用,则会调用createMap()
public void set(T value) {
//获取当前线程
Thread t = Thread.currentThread();
//通过线程获取ThreadLocalMap
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);//首次添加值的时候
}
//创建ThreadLocalMap
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
//创建一个16大小的Entry数组
table = new Entry[INITIAL_CAPACITY];
//计算数组下标
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
//在指定位置存放一个Entry
table[i] = new Entry(firstKey, firstValue);
size = 1;
//扩容大小
setThreshold(INITIAL_CAPACITY);
}
//继承WeakReference,所以key是弱引用,value是强引用,在GC回收时候会被回收掉
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
三者之间关系:
ThreadLocal中存在一个静态内部类ThreadLocalMap,在存储数据时候,ThreadLocal的引用作为key存入ThreadLocalMap中的Entry,Value就是对应的值,Thread类中存放ThreadLocal.ThreadLocalMap,因为key是weakRefence(弱引用),所以在每次GC时候会被回收,在ThreadLocalMap实现时候考虑了这个问题,所以在get() set() remove()时候会进行回收处理。如果我们没有手动调用这些方法则会导致内存泄漏问题
当第二次调用set()方法时候:
public void set(T value) {
Thread t = Thread.currentThread();
//通过当前线程获取TheadLocalMap
ThreadLocalMap map = getMap(t);
//如果不为null
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
//使用线性探索来解决脏数据问题,写入:找到发生冲突最近的空闲单元 查找:从发生冲突的位置往后查找
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
//获取数组下标位置
int i = key.threadLocalHashCode & (len-1);
//从i开始往后一直遍历到数组最后一个Entry(线性探索)
for (Entry e = tab[i];e != null; e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
//如果key相等,则直接覆盖
if (k == key) {
e.value = value;
return;
}
//如果key为null,则用心的key,value覆盖,同时清理历史key=null的数据
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
tab[i] = new Entry(key, value);
int sz = ++size;
//如果超过阈值则要进行扩容
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}