阅读 318

Java之CAS无锁算法

 如果一个线程失败或挂起不会导致其他线程也失败或挂起,那么这种算法就被称为非阻塞算法。而CAS就是一种非阻塞算法实现,也是一种乐观锁技术。它能在不使用锁的情况下实现多线程之间的变量同步,所以CAS也是一种无锁算法。

1、CAS的实现

 CAS包含了3个操作数——需要读写的内存位置V、进行比较的值A和拟写入的新值B。当且仅当V的值等于A时,CAS才会通过原子方式用新值B来更新V的值,否则不会写入新值,下面来看模拟CAS的实现。

    //CAS模拟实现
    public class SimulatedCAS {
        private int value;
    
        public synchronized int get() {
            return value;
        }
    
        public synchronized int compareAndSwap(int expectedValue, int newValue) {
            int oldValue = value;
            if (oldValue == expectedValue) {
                value = newValue;
            }
            return oldValue;
        }
    
        public synchronized boolean compareAndSet(int expectedValue, int newValue) {
            return (expectedValue == compareAndSwap(expectedValue, newValue));
        }
    }
复制代码

SimulatedCAS就是模拟CAS的实现,因为这里仅仅是模拟CAS,明白其语义,所以代码还是很简单的。

    //基于CAS实现的一个线程安全计数器
    public class CasCounter {
        private SimulatedCAS value;

        public int getValue() {
            return value.get();
        }

        public int increment() {
            int v;
            do {
                v = value.get();
            } while (v != value.compareAndSwap(v, v + 1));
            return v + 1;
        }
    }
复制代码

CasCounter就是基于SimulatedCAS实现的一个线程安全的计数器,CasCounter不会阻塞,但如果其他线程同时更新更新计数器,那么将会多次执行重试操作。理论上,如果其他线程在每次竞争CAS时总是获胜,那么这个线程每次都会重试,但实际上很少发生这种类型的饥饿问题。

2、原子变量类

 Java5.0引入了底层的支持,在int、long和对象引用等类型上也都公开了CAS操作,并且JVM把它们编译为底层硬件提供的最有效方法。在支持CAS的平台,运行时把它们编译为相应的(多条)机器指令。最坏的情况下,如果不支持CAS指令,那么JVM将使用自旋锁。因此Java提供的CAS解决方案就与平台/编译器紧密相关,其性能也受平台/编译器影响。

 原子变量类(例如java.util.concurrent.atomic中的AtomicXxx)就使用了这种高效的CAS操作。

类型 具体实现类 描述
基本数据类型 AtomicInteger 原子更新Integer类型
AtomicBoolean 原子更新boolean类型
AtomicLong 原子更新long类型
数组类型 AtomicIntegerArray 原子更新Integer数组类型
AtomicLongArray 原子更新long数组类型
AtomicReferenceArray 原子更新对象数组类型
引用类型 AtomicReference 原子更新对象
AtomicStampedReference 原子更新对象,解决ABA问题
AtomicMarkableReference 原子更新对象,解决ABA问题
更新字段类型 AtomicIntegerFieldUpdater 原子更新已存在的volatile修饰的Integer类型,使用反射实现
AtomicLongFieldUpdater 原子更新已存在的volatile修饰的long类型,使用反射实现
AtomicReferenceFieldUpdater 原子更新已存在的volatile修饰的对象,使用反射实现

 上面就是Java中的所有原子变量类,基本上都比较好理解,但是个别的还是需要说明一下。

AtomicStampedReferenceAtomicMarkableReference都是为了解决ABA问题而存在的。但它们是有区别的,AtomicStampedReference是给对象引用上加一个版本号来避免ABA问题,而AtomicMarkableReference是给对象引用加上一个标记(boolean类型)来避免ABA问题的。

 更新字段类型又叫原子的域更新器,表示现有volatile的一种基于反射的“视图”,从而能够在已有的volatile域上使用CAS。它提供的原子性保证比普通原子类更弱一些,因为无法直接保证底层的域不被直接修改——compareAndSet以及其他算术方法只能确保其他使用更新器方法的线程的原子性。

 下面来看AtomicInteger的源码。

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            //valueOffset可以说是value在JVM中的虚拟内存地址
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;
    //传入初始化值
    public AtomicInteger(int initialValue) {
        value = initialValue;
    }

    //无参
    public AtomicInteger() {
    }

    //返回当前值
    public final int get() {
        return value;
    }
    //设置新值
    public final void set(int newValue) {
        value = newValue;
    }

    //1.6新增方法,设置新值,但不会立即刷新,性能比set方法好
    public final void lazySet(int newValue) {
        unsafe.putOrderedInt(this, valueOffset, newValue);
    }

    //设定新值并返回旧值
    public final int getAndSet(int newValue) {
        return unsafe.getAndSetInt(this, valueOffset, newValue);
    }

    //比较并替换,Java CAS实现方法
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    //比较并替换,Java CAS实现方案,与compareAndSet实现一样名单不排除未来会有不同实现
    public final boolean weakCompareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    //以原子方式将当前值增加1。返回旧值
    public final int getAndIncrement() {
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }

    //以原子方式将当前值减1。返回旧值
    public final int getAndDecrement() {
        return unsafe.getAndAddInt(this, valueOffset, -1);
    }

    //以原子方式将给定值添加到当前值。返回旧值
    public final int getAndAdd(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta);
    }

    //以原子方式将当前值增加1。返回新值
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }

    //以原子方式将当前值减1。返回新值
    public final int decrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
    }

    //以原子方式将给定值添加到当前值。返回新值
    public final int addAndGet(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
    }

    //1.8新增方法,更新并返回旧值
    public final int getAndUpdate(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    //1.8新增方法,更新并返回新值
    public final int updateAndGet(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSet(prev, next));
        return next;
    }

    //1.8新增方法,更新并返回旧值
    public final int getAndAccumulate(int x,
                                      IntBinaryOperator accumulatorFunction) {
        int prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsInt(prev, x);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    //1.8新增方法,更新并返回新值
    public final int accumulateAndGet(int x,
                                      IntBinaryOperator accumulatorFunction) {
        int prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsInt(prev, x);
        } while (!compareAndSet(prev, next));
        return next;
    }
    //都是些类型转换的实现
    ...
}
复制代码

valueOffset是变量在JVM中的内存地址,也就是CAS中所说的内存位置V。setlazySetcompareAndSetweakCompareAndSet这两组方法需要注意一下。

 由于valuevolatile修饰的,所以当调用set方法时,所有线程的value值会立即更新,而lazySet则不会,只有在重新写入新值的时候才会更新,而读取的还是旧值。关于这两个方法的区别可以参考J.U.C原子工具类AtomicXXX中,set和lazySet的区别AtomicInteger lazySet vs. set这两篇文章。

compareAndSetweakCompareAndSet这两个方法就比较有意思,因为实现一模一样的,那是为什么尼?在Stack Overflow上说,Java在未来可能会有不同的实现,关于这两个方法的区别可以参考java – 如果weakCompareAndSet如果实现完全像compareAndSet,会如何失败?这篇文章。

 在AtomicInteger及其他原子变量类中,都是使用Unsafe来实现CAS的,该类大部分都是native方法且不对外公开,因为是不安全的。该类与硬件相关,都是CPU指令级的操作,只有一步原子操作,而且CAS避免了请求操作系统来裁定锁的问题,不用麻烦操作系统,直接在CPU内部就搞定了,所以效率非常高且线程安全。

3、CAS原子操作的三大问题

 CAS主要有以下问题。

  • ABA问题。
  • 由于CAS是CPU指令级别的操作,所以当循环时间过长容易造成CPU高负荷。
  • 对一个变量进行操作可以,同时操作多个共享变量有点麻烦。可以通过AtomicReference来解决。

 ABA问题是CAS中的一个异常现象,来回顾一下CAS的语义,当且仅当V的值等于A时,CAS才会通过原子方式用新值B来更新V的值,否则不会执行任何操作。那么如果先将V的值改为C然后再改为A,那么其他线程的CAS操作仍然能够成功,但这样对吗?很明显是不对的,因为V的值变化了。那么如何解决尼?其实可以在变量前面添加版本号,每次变量更新的时候都把版本号加一,这样变化过程就从“A-B-A”变成了“1A-2B-3A”。

4、总结

 在Java中,CAS的运用还是比较广泛的,如对象创建的原子性、Java 1.8中ConcurrentHashMap及RxJava的多线程变量同步都是通过CAS来实现的。

【参考资料】

不可不说的Java“锁”事

JAVA CAS原理深度分析

非阻塞同步算法与CAS(Compare and Swap)无锁算法

【死磕Java并发】—-深入分析CAS

Java并发编程-无锁CAS与Unsafe类及其并发包Atomic

Psychosomatic, Lobotomy, Saw

关注下面的标签,发现更多相似文章
评论