无锁并发

无锁并发

使用 CAS 代替锁

案例演示

public interface Account {
    int getBalance();

    void withdraw(int amount);
}
public class ThreadUnsafeAccount implements Account {
    private int balance;

    public ThreadUnsafeAccount() {
    }

    public ThreadUnsafeAccount(int balance) {
        this.balance = balance;
    }

    public void withdraw(int amount) {
        this.balance -= amount;
    }

    public int getBalance() {
        return balance;
    }

    public void setBalance(int balance) {
        this.balance = balance;
    }
}
public class SynchronizedAccount implements Account {
    private int balance;

    public SynchronizedAccount(int balance) {
        this.balance = balance;
    }

    @Override
    public int getBalance() {
        synchronized (this) {
            return this.balance;
        }
    }

    @Override
    public void withdraw(int amount) {
        synchronized (this) {
            this.balance -= amount;
        }
    }
}
public class CASAccount implements Account {
    private AtomicInteger balance;

    public CASAccount() {
    }

    public CASAccount(int balance) {
        this.balance = new AtomicInteger(balance);
    }

    public int getBalance() {
        return balance.get();
    }

    public void setBalance(int balance) {
        this.balance = new AtomicInteger(balance);
    }

    public void withdraw(int amount) {
        while (true) {
            int expect = balance.get();
            int update = expect - amount;
            // CAS操作
            if (balance.compareAndSet(expect, update)) {
                break;
            }
        }
    }
}
public class AccountMain {
    public static final int COUNT = 1000;
    public static final int ACCOUNT = 20000;

    public static void runDemo(Account account) {
        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < COUNT; i++) {
            threads.add(new Thread(() -> account.withdraw(20)));
        }

        long begin = System.nanoTime();
        threads.forEach(Thread::start);
        threads.forEach(thread -> {
            try {
                // 主线程中调用每一个子线程的join()方法, 主线程等待每一个子线程运行结束
                thread.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        long end = System.nanoTime();
        System.out.println(account.getClass().getSimpleName() + ".getBalance() = " + account.getBalance());
        System.out.println("cost times = " + (end - begin) / 1_000_000 + " ms");
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 100; i++) {

            ThreadUnsafeAccount threadUnsafeAccount = new ThreadUnsafeAccount(ACCOUNT);
            SynchronizedAccount synchronizedAccount = new SynchronizedAccount(ACCOUNT);
            CASAccount casAccount = new CASAccount(ACCOUNT);

            runDemo(threadUnsafeAccount);
            runDemo(synchronizedAccount);
            runDemo(casAccount);
        }
    }
}

CAS 底层原理

用当前线程获取到的值和共享变量的最新值进行比较,如果相同,则通过CAS原子操作进行设置。(最终一致性)

因此,CAS 需要搭配 volatile 关键字进行使用,而 AtomicInteger 类中的值 value 则是使用了 volatile 进行修饰。

最终一致性(ABA问题)演示

线程无法感知到其它线程对共享变量的修改,在大多数情况下是没有影响的。但是如果希望判断是否有其他线程对该共享变量进行修改,那么需要额外使用版本号,即不仅比较值,而且比较版本号,每次更新版本号加1。

public class ABACAS {
    private static AtomicInteger num = new AtomicInteger(1);

    public static void main(String[] args) throws InterruptedException {

        new Thread(() -> {
            while (true) {
                int prev = num.get();
                int next = prev - 1;
                if (num.compareAndSet(prev, next)) {
                    break;
                }
            }
        }).start();

        TimeUnit.SECONDS.sleep(2);

        new Thread(() -> {
            while (true) {
                int prev = num.get();
                int next = prev + 1;
                if (num.compareAndSet(prev, next)) {
                    break;
                }
            }
        }).start();

        TimeUnit.SECONDS.sleep(2);

        //主线程无法感知到这个1经历过1->0->1的变化, 只要满足最终一致性即可
        num.compareAndSet(1, 10);
        System.out.println("num.get() = " + num.get());
    }
}
public class ABACASV2 {
    // version版本号是int类型, 如果发生整数上溢该怎么办?
    private static AtomicStampedReference<String> reference = new AtomicStampedReference<>("hello", 1);

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            while (true) {
                int version = reference.getStamp();
                String prev = reference.getReference();
                String next = "HELLO";
                if (reference.compareAndSet(prev, next, version, version + 1)) {
                    System.out.println(Thread.currentThread().getName() + ".getReference() = " + reference.getReference());
                    break;
                }
            }
        }).start();

        TimeUnit.SECONDS.sleep(2);


        new Thread(() -> {
            while (true) {
                int version = reference.getStamp();
                String prev = reference.getReference();
                String next = "hello";
                if (reference.compareAndSet(prev, next, version, version + 1)) {
                    System.out.println(Thread.currentThread().getName() + ".getReference() = " + reference.getReference());
                    break;
                }
            }
        }).start();

        TimeUnit.SECONDS.sleep(2);

        reference.compareAndSet("hello", "HelloWorld", 1, 2);
        System.out.println(Thread.currentThread().getName() + ".getReference() = " + reference.getReference());
    }
}

AtomicXXX 工具类

AtomicInteger 原子整数

AtomicReference 原子引用

原子数组

AtomicReferenceFieldUpdater 字段更新器

  • 通过静态方法 newUpdater() 来创建字段更新器
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

@Data
@NoArgsConstructor
@AllArgsConstructor
class Student {
    // CAS 相关工具类必须搭配 volatile 变量
    volatile String name;
}

public class FieldUpdater {
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 100; i++) {

            AtomicReferenceFieldUpdater<Student, String> studentNameUpdater = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");

            Student student = new Student("root");

            new Thread(() -> {
                studentNameUpdater.compareAndSet(student, "root", "thread-0");
            }).start();

            new Thread(() -> {
                studentNameUpdater.compareAndSet(student, "root", "thread-1");
            }).start();

            Thread.sleep(50);
            System.out.println("student = " + student);
        }
    }
}

使用 Unsafe 类来模拟实现 AtomicInteger 原子整数类

import lombok.SneakyThrows;
import sun.misc.Unsafe;

import java.lang.reflect.Field;

public class AtomicIntegerFake {
    private volatile int value;
    private static final Unsafe unsafe;
    private static final long valueOffset;


    static {
        // 通过反射获取unsafe对象
        try {
            // 这里在JDK8之后可以直接通过Unsafe.getUnsafe()来获取Unsafe对象
            Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            theUnsafe.setAccessible(true);
            unsafe = (Unsafe) theUnsafe.get(null);
            valueOffset = unsafe.objectFieldOffset(AtomicIntegerFake.class.getDeclaredField("value"));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }


    public AtomicIntegerFake() {
    }

    public AtomicIntegerFake(int value) {
        this.value = value;
    }


    @SneakyThrows
    public void compareAndSet(int prev, int next) {
        while (true) {
            if (unsafe.compareAndSwapInt(this, valueOffset, prev, next)) {
                break;
            }
        }
    }

    /**
     * 这些方法都必须通过unsafe对象直接进行调用, 而不能通过方法重用的思想调用当前类中的其它方法
     *
     * @param amount
     */
    public void increase(int amount) {
        unsafe.compareAndSwapInt(this, valueOffset, this.value, this.value + amount);
    }

    public void decrease(int amount) {
        unsafe.compareAndSwapInt(this, valueOffset, this.value, this.value - amount);
    }

    public int get() {
        return this.value;
    }
}

使用自定义的 AtomicIntegerFake 来替换掉原有的 AtomicInteger 来进行测试

import org.example.cas.atomic.AtomicIntegerFake;

public class AtomicIntegerFakeAccount implements Account {
    private AtomicIntegerFake balance;

    public AtomicIntegerFakeAccount() {

    }

    public AtomicIntegerFakeAccount(int balance) {
        this.balance = new AtomicIntegerFake(balance);
    }


    public int getBalance() {
        return balance.get();
    }

    public void setBalance(int balance) {
        this.balance = new AtomicIntegerFake(balance);
    }

    public void withdraw(int amount) {
        balance.decrease(amount);
    }
}

   转载规则


《无锁并发》 熊水斌 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
深度优先搜索 深度优先搜索
矩阵背景面试题13. 机器人的运动范围 问题描述: 地上有一个m行n列的方格,从坐标 [0,0] 到坐标 [m-1,n-1] 。一个机器人从坐标 [0, 0] 的格子开始移动,它每次可以向左、右、上、下移动一格(不能移动到方格外),
下一篇 
锁
锁的两种实现可重入锁可以多次加锁, 加锁几次就要解锁几次 读写锁读写分离, 提高多线程的并发读的能力. 不同线程: 读锁和写锁互斥, 写锁和写锁互斥. 同一线程: 支持锁降级, 不支持锁升级(写锁级别比读锁高, 获取到写锁后可以继续获取
2023-03-14
  目录