Java 5新增了Atomic包,里面类包含方法getAndIncrement()以及getAndDecrement(),这两个方法实现了原子加以及原子减操作,但是比较不同的是这两个操作没有使用任何加锁机制,属于无锁操作。
在JDK 5之前Java语言是靠synchronized关键字保证同步的,这会导致有锁(后面的章节还会谈到锁)。
锁机制存在以下问题:
(1)在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题。
(2)一个线程持有锁会导致其它所有需要此锁的线程挂起。
(3)如果一个优先级高的线程等待一个优先级低的线程释放锁会导致优先级倒置,引起性能风险。
volatile是不错的机制,但是volatile不能保证原子性。因此对于同步最终还是要回到锁机制上来。
定义:
悲观锁(Pessimistic Lock):
每次获取数据的时候,都会担心数据被修改,所以每次获取数据的时候都会进行加锁,确保在自己使用的过程中数据不会被别人修改,使用完成后进行数据解锁。由于数据进行加锁,期间对该数据进行读写的其他线程都会进行等待。
乐观锁(Optimistic Lock):
每次获取数据的时候,都不会担心数据被修改,所以每次获取数据的时候都不会进行加锁假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。
适用场景:
悲观锁:比较适合写入操作比较频繁的场景,如果出现大量的读取操作,每次读取的时候都会进行加锁,这样会增加大量的锁的开销,降低了系统的吞吐量。
乐观锁:比较适合读取操作比较频繁的场景,如果出现大量的写入操作,数据发生冲突的可能性就会增大,为了保证数据的一致性,应用层需要不断的重新获取数据,这样会增加大量的查询操作,降低了系统的吞吐量。
总结:两种所各有优缺点,读取频繁使用乐观锁,写入频繁使用悲观锁。
CAS 操作
java.util.concurrent(J.U.C)种提供的atomic包中的类使用的是乐观锁,乐观锁用到的机制就是CAS,Compare and Swap。
CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。
Atomic实现原子性的原理就是不断拿当前工作内存中的值和主内存的值去比较,才更新。
AtomicXXX:CAS 、Unsafe.compareAndSwapInt
看一下AtomicInteger.getAndIncrement的源码
/** * Atomically increments by one the current value. * * @return the previous value */ public final int getAndIncrement() { // 主要是调用了unsafe的方法 // private static final Unsafe unsafe = Unsafe.getUnsafe(); return unsafe.getAndAddInt(this, valueOffset, 1); }
/** * 获取底层当前的值并且+1 * @param var1 需要操作的AtomicInteger 对象 * @param var2 当前的值 * @param var4 要增加的值 */ public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { // 获取底层的该对象当前的值 var5 = this.getIntVolatile(var1, var2); // 获取完底层的值和自增操作之间,可能系统的值已经又被其他线程改变了 //如果又被改变了,则重新计算系统底层的值,并重新执行本地方法 } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
/** * 本地的CAS方法核心 * @param var1 需要操作的AtomicInteger 对象 * @param var2 当前本地变量中的的值 * @param var4 要增加的值 * @param var5 从底层传过来的值 * @Return 如果当前本地变量的值(var2)与底层的值(var4)不等,则返回false,否则更新为var5的值并返回True */ public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
package com.xidian.example; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import com.xidian.annotations.ThreadSafe; import lombok.extern.slf4j.Slf4j; @Slf4j @ThreadSafe public class AtomicExample1 { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; public static AtomicInteger count = new AtomicInteger(0); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count.get()); } private static void add() { count.incrementAndGet(); // count.getAndIncrement(); } }