锁的两种实现
可重入锁
可以多次加锁, 加锁几次就要解锁几次
读写锁
读写分离, 提高多线程的并发读的能力.
- 不同线程: 读锁和写锁互斥, 写锁和写锁互斥.
- 同一线程: 支持锁降级, 不支持锁升级(写锁级别比读锁高, 获取到写锁后可以继续获取读锁, 但是反之则不行)
// 锁降级案例演示: 成功
@Test
public void func01Test(){
// 读写锁对象并没有实现Lock接口, 需要获取其内部的读锁对象或写锁对象
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Lock readLock = readWriteLock.readLock();
Lock writeLock = readWriteLock.writeLock();
System.out.println(Thread.currentThread().getName() + " : 申请写锁");
writeLock.lock();
System.out.println(Thread.currentThread().getName() + " : 获取写锁");
System.out.println(Thread.currentThread().getName() + " : 申请读锁");
readLock.lock();
System.out.println(Thread.currentThread().getName() + " : 获取读锁");
writeLock.unlock();
readLock.unlock();
}
// 锁升级案例演示: 失败
@Test
public void func02Test(){
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Lock readLock = readWriteLock.readLock();
Lock writeLock = readWriteLock.writeLock();
System.out.println(Thread.currentThread().getName() + " : 申请读锁");
readLock.lock();
System.out.println(Thread.currentThread().getName() + " : 获取读锁");
System.out.println(Thread.currentThread().getName() + " : 申请写锁");
// 会阻塞在这里, 主线程获取到readLock后无法再获取writeLock
writeLock.lock();
System.out.println(Thread.currentThread().getName() + " : 获取写锁");
readLock.unlock();
writeLock.unlock();
}
公平锁和非公平锁
非公平锁: 效率更高, 但可能某个线程饥饿
公平锁: 线程放入到一个阻塞队列中, 每次将锁分配给队首线程
默认情况是非公平锁.
// 默认情况是非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0){
throw new Error("Maximum lock count exceeded");
}
setState(nextc);
return true;
}
return false;
}
}
// 在队列中没有线程等待, 突然多个并发线程竞争一把锁的时候会导致多个线程同时进入到CAS操作, 可能造成不公平
public final boolean hasQueuedPredecessors() {
// Read fields in reverse initialization order
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
CAS操作
AQS抽象队列同步器
加锁过程
public final void acquire(int arg) {
// tryAcquire(arg)->true: 成功获取锁, 直接返回
// tryAcquire(arg)->false: 未能成功获取锁, 说明锁已经被其它线程占用, 进入阻塞队列acquireQueued()
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
// 当前线程放置到阻塞队列后, 中断当前线程, 等待被唤醒
selfInterrupt();
}
}
// 公平锁的tryAcquire()重载方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// state属性表示上了几次锁, 如果一个线程上了3次锁, 则state属性值为3
int c = getState();
if (c == 0) {
// c为0表示锁没有被任意一个线程占用
// hasQueuedPredecessors()在并发的情况下可能多个线程都能判断成功, 相当于设置一道简单的门槛
// CAS操作只会有一个线程能够成功, 通过第一道门槛筛选一部分线程, 减轻CAS的压力
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 此时成功进行CAS的那个唯一的线程获取到锁, 其它未能成功CAS的线程跳出if直接返回false
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 当前线程已经是锁的拥有者, 这是获取锁的线程又一次上锁, 也是为什么state会增加的原因
int nextc = c + acquires;
// 加锁次数太多, 导致发生int上溢
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 公平锁只有在上面两种情况下才能够成功获取锁, 否则获取锁失败
return false;
}
// 非公平锁的tryAcquire()重载方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 将当前线程封装成Node节点, 准备放入到阻塞队列末尾
private Node addWaiter(Node mode) {
// 将当前线程封装到为一个Node, 准备放入队列中
Node node = new Node(Thread.currentThread(), mode);
// 非首次入队可以不需要考虑初始化
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 尝试通过CAS快速加入到队列中, 如果失败再通过enq()来加入队列
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 首次入队或CAS快速入队失败
enq(node);
return node;
}
// 自旋入队, 这里面是一个死循环
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 非首次入队
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 处理CAS快速入队失败的情况, 并且这里面的CAS也可能再次失败, 因此使用死循环
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋机制的标志: 出现死循环(伪)
for (;;) {
// 获取前一个节点
final Node p = node.predecessor();
// 如果当前线程是队列中的第一个节点, 则会再次进行获取锁的操作
if (p == head && tryAcquire(arg)) {
// 如果当前线程获取了锁, 那么当前节点出队列(相当于设置其为head)
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 没有获取到锁或者不是队列中的第一个线程
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// pred是node的前驱节点, 一直重复尝试, 直到将pred的状态设置为SIGNAL才停止
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
// ws > 0 <-> 获取锁的操作已经取消
if (ws > 0) {
// 找到第一个没有被取消的节点, 通过双向链表操作将中间的取消节点删除掉
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果获取锁的操作没有被取消, 则尝试设置其等待状态为SIGNAL(设置操作在此执行)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// LockSupport是一个工具类, park()方法将线程挂起, 直到其它线程发送一个unpack()信号或者interrupt()中断信号
LockSupport.park(this);
return Thread.interrupted();
}