锁的两种实现

可重入锁

可以多次加锁, 加锁几次就要解锁几次

读写锁

读写分离, 提高多线程的并发读的能力.

  • 不同线程: 读锁和写锁互斥, 写锁和写锁互斥.
  • 同一线程: 支持锁降级, 不支持锁升级(写锁级别比读锁高, 获取到写锁后可以继续获取读锁, 但是反之则不行)
// 锁降级案例演示: 成功
@Test
public void func01Test(){
    // 读写锁对象并没有实现Lock接口, 需要获取其内部的读锁对象或写锁对象
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    Lock readLock = readWriteLock.readLock();
    Lock writeLock = readWriteLock.writeLock();
    System.out.println(Thread.currentThread().getName() + " : 申请写锁");
    writeLock.lock();
    System.out.println(Thread.currentThread().getName() + " : 获取写锁");
    System.out.println(Thread.currentThread().getName() + " : 申请读锁");
    readLock.lock();
    System.out.println(Thread.currentThread().getName() + " : 获取读锁");
    writeLock.unlock();
    readLock.unlock();
}

// 锁升级案例演示: 失败
@Test
public void func02Test(){
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    Lock readLock = readWriteLock.readLock();
    Lock writeLock = readWriteLock.writeLock();
    System.out.println(Thread.currentThread().getName() + " : 申请读锁");
    readLock.lock();
    System.out.println(Thread.currentThread().getName() + " : 获取读锁");
    System.out.println(Thread.currentThread().getName() + " : 申请写锁");
    // 会阻塞在这里, 主线程获取到readLock后无法再获取writeLock
    writeLock.lock();
    System.out.println(Thread.currentThread().getName() + " : 获取写锁");
    readLock.unlock();
    writeLock.unlock();
}

公平锁和非公平锁

非公平锁: 效率更高, 但可能某个线程饥饿

公平锁: 线程放入到一个阻塞队列中, 每次将锁分配给队首线程

默认情况是非公平锁.

// 默认情况是非公平锁    
public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}


static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0){
                throw new Error("Maximum lock count exceeded");                
            }
            setState(nextc);
            return true;
        }
        return false;
    }
}

// 在队列中没有线程等待, 突然多个并发线程竞争一把锁的时候会导致多个线程同时进入到CAS操作, 可能造成不公平
public final boolean hasQueuedPredecessors() {
    // Read fields in reverse initialization order
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

CAS操作

AQS抽象队列同步器

加锁过程

public final void acquire(int arg) {
    // tryAcquire(arg)->true: 成功获取锁, 直接返回
    // tryAcquire(arg)->false: 未能成功获取锁, 说明锁已经被其它线程占用, 进入阻塞队列acquireQueued()
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
        // 当前线程放置到阻塞队列后, 中断当前线程, 等待被唤醒
        selfInterrupt();        
    }
}


// 公平锁的tryAcquire()重载方法
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // state属性表示上了几次锁, 如果一个线程上了3次锁, 则state属性值为3
    int c = getState();
    if (c == 0) {
        // c为0表示锁没有被任意一个线程占用
        // hasQueuedPredecessors()在并发的情况下可能多个线程都能判断成功, 相当于设置一道简单的门槛
        // CAS操作只会有一个线程能够成功, 通过第一道门槛筛选一部分线程, 减轻CAS的压力
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            // 此时成功进行CAS的那个唯一的线程获取到锁, 其它未能成功CAS的线程跳出if直接返回false
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        // 当前线程已经是锁的拥有者, 这是获取锁的线程又一次上锁, 也是为什么state会增加的原因
        int nextc = c + acquires;
        // 加锁次数太多, 导致发生int上溢
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // 公平锁只有在上面两种情况下才能够成功获取锁, 否则获取锁失败
    return false;
}


// 非公平锁的tryAcquire()重载方法
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}



// 将当前线程封装成Node节点, 准备放入到阻塞队列末尾
private Node addWaiter(Node mode) {
    // 将当前线程封装到为一个Node, 准备放入队列中
    Node node = new Node(Thread.currentThread(), mode);

    // 非首次入队可以不需要考虑初始化
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        // 尝试通过CAS快速加入到队列中, 如果失败再通过enq()来加入队列
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }

    // 首次入队或CAS快速入队失败
    enq(node);
    return node;
}

// 自旋入队, 这里面是一个死循环
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) {
            // 非首次入队
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 处理CAS快速入队失败的情况, 并且这里面的CAS也可能再次失败, 因此使用死循环
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}




final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 自旋机制的标志: 出现死循环(伪)
        for (;;) {
            // 获取前一个节点
            final Node p = node.predecessor();
            // 如果当前线程是队列中的第一个节点, 则会再次进行获取锁的操作
            if (p == head && tryAcquire(arg)) {
                // 如果当前线程获取了锁, 那么当前节点出队列(相当于设置其为head)
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 没有获取到锁或者不是队列中的第一个线程
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){
                interrupted = true;                
            }

        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

// pred是node的前驱节点, 一直重复尝试, 直到将pred的状态设置为SIGNAL才停止
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;

    // ws > 0 <-> 获取锁的操作已经取消
    if (ws > 0) {
        // 找到第一个没有被取消的节点, 通过双向链表操作将中间的取消节点删除掉
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 如果获取锁的操作没有被取消, 则尝试设置其等待状态为SIGNAL(设置操作在此执行)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}


private final boolean parkAndCheckInterrupt() {
    // LockSupport是一个工具类, park()方法将线程挂起, 直到其它线程发送一个unpack()信号或者interrupt()中断信号
    LockSupport.park(this);
    return Thread.interrupted();
}

解锁过程


   转载规则


《锁》 熊水斌 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
无锁并发 无锁并发
无锁并发使用 CAS 代替锁案例演示public interface Account { int getBalance(); void withdraw(int amount); } public class Thread
2023-03-14
下一篇 
Synchronized关键字详解 Synchronized关键字详解
Synchronized关键字详解Monitor对象普通对象 wait()和notify()只有获取了锁的线程才能够调用 wait() 方法和 notify() 方法。 可重入锁 ReentrantLock通过 lockInterrupti
2023-03-13
  目录