无锁并发
使用 CAS 代替锁
案例演示
public interface Account {
int getBalance();
void withdraw(int amount);
}
public class ThreadUnsafeAccount implements Account {
private int balance;
public ThreadUnsafeAccount() {
}
public ThreadUnsafeAccount(int balance) {
this.balance = balance;
}
public void withdraw(int amount) {
this.balance -= amount;
}
public int getBalance() {
return balance;
}
public void setBalance(int balance) {
this.balance = balance;
}
}
public class SynchronizedAccount implements Account {
private int balance;
public SynchronizedAccount(int balance) {
this.balance = balance;
}
@Override
public int getBalance() {
synchronized (this) {
return this.balance;
}
}
@Override
public void withdraw(int amount) {
synchronized (this) {
this.balance -= amount;
}
}
}
public class CASAccount implements Account {
private AtomicInteger balance;
public CASAccount() {
}
public CASAccount(int balance) {
this.balance = new AtomicInteger(balance);
}
public int getBalance() {
return balance.get();
}
public void setBalance(int balance) {
this.balance = new AtomicInteger(balance);
}
public void withdraw(int amount) {
while (true) {
int expect = balance.get();
int update = expect - amount;
// CAS操作
if (balance.compareAndSet(expect, update)) {
break;
}
}
}
}
public class AccountMain {
public static final int COUNT = 1000;
public static final int ACCOUNT = 20000;
public static void runDemo(Account account) {
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < COUNT; i++) {
threads.add(new Thread(() -> account.withdraw(20)));
}
long begin = System.nanoTime();
threads.forEach(Thread::start);
threads.forEach(thread -> {
try {
// 主线程中调用每一个子线程的join()方法, 主线程等待每一个子线程运行结束
thread.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
long end = System.nanoTime();
System.out.println(account.getClass().getSimpleName() + ".getBalance() = " + account.getBalance());
System.out.println("cost times = " + (end - begin) / 1_000_000 + " ms");
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 100; i++) {
ThreadUnsafeAccount threadUnsafeAccount = new ThreadUnsafeAccount(ACCOUNT);
SynchronizedAccount synchronizedAccount = new SynchronizedAccount(ACCOUNT);
CASAccount casAccount = new CASAccount(ACCOUNT);
runDemo(threadUnsafeAccount);
runDemo(synchronizedAccount);
runDemo(casAccount);
}
}
}
CAS 底层原理
用当前线程获取到的值和共享变量的最新值进行比较,如果相同,则通过CAS原子操作进行设置。(最终一致性)
因此,CAS 需要搭配 volatile 关键字进行使用,而 AtomicInteger 类中的值 value 则是使用了 volatile 进行修饰。
最终一致性(ABA问题)演示
线程无法感知到其它线程对共享变量的修改,在大多数情况下是没有影响的。但是如果希望判断是否有其他线程对该共享变量进行修改,那么需要额外使用版本号,即不仅比较值,而且比较版本号,每次更新版本号加1。
public class ABACAS {
private static AtomicInteger num = new AtomicInteger(1);
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
while (true) {
int prev = num.get();
int next = prev - 1;
if (num.compareAndSet(prev, next)) {
break;
}
}
}).start();
TimeUnit.SECONDS.sleep(2);
new Thread(() -> {
while (true) {
int prev = num.get();
int next = prev + 1;
if (num.compareAndSet(prev, next)) {
break;
}
}
}).start();
TimeUnit.SECONDS.sleep(2);
//主线程无法感知到这个1经历过1->0->1的变化, 只要满足最终一致性即可
num.compareAndSet(1, 10);
System.out.println("num.get() = " + num.get());
}
}
public class ABACASV2 {
// version版本号是int类型, 如果发生整数上溢该怎么办?
private static AtomicStampedReference<String> reference = new AtomicStampedReference<>("hello", 1);
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
while (true) {
int version = reference.getStamp();
String prev = reference.getReference();
String next = "HELLO";
if (reference.compareAndSet(prev, next, version, version + 1)) {
System.out.println(Thread.currentThread().getName() + ".getReference() = " + reference.getReference());
break;
}
}
}).start();
TimeUnit.SECONDS.sleep(2);
new Thread(() -> {
while (true) {
int version = reference.getStamp();
String prev = reference.getReference();
String next = "hello";
if (reference.compareAndSet(prev, next, version, version + 1)) {
System.out.println(Thread.currentThread().getName() + ".getReference() = " + reference.getReference());
break;
}
}
}).start();
TimeUnit.SECONDS.sleep(2);
reference.compareAndSet("hello", "HelloWorld", 1, 2);
System.out.println(Thread.currentThread().getName() + ".getReference() = " + reference.getReference());
}
}
AtomicXXX 工具类
AtomicInteger 原子整数
AtomicReference 原子引用
原子数组
AtomicReferenceFieldUpdater 字段更新器
- 通过静态方法
newUpdater()
来创建字段更新器
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@Data
@NoArgsConstructor
@AllArgsConstructor
class Student {
// CAS 相关工具类必须搭配 volatile 变量
volatile String name;
}
public class FieldUpdater {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 100; i++) {
AtomicReferenceFieldUpdater<Student, String> studentNameUpdater = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");
Student student = new Student("root");
new Thread(() -> {
studentNameUpdater.compareAndSet(student, "root", "thread-0");
}).start();
new Thread(() -> {
studentNameUpdater.compareAndSet(student, "root", "thread-1");
}).start();
Thread.sleep(50);
System.out.println("student = " + student);
}
}
}
使用 Unsafe 类来模拟实现 AtomicInteger 原子整数类
import lombok.SneakyThrows;
import sun.misc.Unsafe;
import java.lang.reflect.Field;
public class AtomicIntegerFake {
private volatile int value;
private static final Unsafe unsafe;
private static final long valueOffset;
static {
// 通过反射获取unsafe对象
try {
// 这里在JDK8之后可以直接通过Unsafe.getUnsafe()来获取Unsafe对象
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafe = (Unsafe) theUnsafe.get(null);
valueOffset = unsafe.objectFieldOffset(AtomicIntegerFake.class.getDeclaredField("value"));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public AtomicIntegerFake() {
}
public AtomicIntegerFake(int value) {
this.value = value;
}
@SneakyThrows
public void compareAndSet(int prev, int next) {
while (true) {
if (unsafe.compareAndSwapInt(this, valueOffset, prev, next)) {
break;
}
}
}
/**
* 这些方法都必须通过unsafe对象直接进行调用, 而不能通过方法重用的思想调用当前类中的其它方法
*
* @param amount
*/
public void increase(int amount) {
unsafe.compareAndSwapInt(this, valueOffset, this.value, this.value + amount);
}
public void decrease(int amount) {
unsafe.compareAndSwapInt(this, valueOffset, this.value, this.value - amount);
}
public int get() {
return this.value;
}
}
使用自定义的 AtomicIntegerFake 来替换掉原有的 AtomicInteger 来进行测试
import org.example.cas.atomic.AtomicIntegerFake;
public class AtomicIntegerFakeAccount implements Account {
private AtomicIntegerFake balance;
public AtomicIntegerFakeAccount() {
}
public AtomicIntegerFakeAccount(int balance) {
this.balance = new AtomicIntegerFake(balance);
}
public int getBalance() {
return balance.get();
}
public void setBalance(int balance) {
this.balance = new AtomicIntegerFake(balance);
}
public void withdraw(int amount) {
balance.decrease(amount);
}
}