[并发编程]-关于 CAS 的几个问题

2,358 阅读8分钟

CAS 相关基础知识

CAS的全称是Compare And Swap ,即比较交换。CAS 中一般会设计到3个参数:

  • 内存值 V
  • 旧的预期值A
  • 要修改的新值B

当且仅当预期值 A 和内存值 V 相同时,将内存值V修改为 B,否则什么都不做。

这里关于 CPU 指令对于 CAS 的支持不深入研究,有兴趣的可以自行了解。

CAS 几个问题

很多书籍和文章中都有提出它存在的几个问题:

  • 1、循环时间长开销很大
  • 2、只能保证一个共享变量的原子操作
  • 3、ABA 问题

下面就这三个问题展开来聊一下。

1、关于“循环时间长开销很大”的疑惑与验证

自旋 CAS 如果长时间不成功,会给 CPU 带来非常大的开销。但是真的是这样吗?到底多大的并发量才造成 CAS 的自旋次数会增加呢?另外,对于当前的机器及JDK,在无锁,无CAS 的情况下,是否对于结果的影响是真的那么明显呢?对于这个问题,下面做了一个简单的测试,但是测试结果也只是针对在我本地环境下,各位看官可以拉一下代码,在自己电脑上 run 一下,把机器信息、JDK版本以及测试结果留言到评论区。

本文案例可以这里获取:glmapper-blog-sample-cas

这里我是用了一个很简单的案例,就是整数自增。使用了两种方式去测试的,一种是无锁,也不用 CAS 操作,另外一种是基于 CAS 的方式。(关于加锁的方式没有验证,有时间再补充吧~)

计数器类

计数器里面有两个方法,一种是CAS 自旋方式,一种是直接自增。代码如下:

public class Counter {
    public AtomicInteger safeCount = new AtomicInteger(0);
    public int unsafe = 0;
    // 使用自旋的方式
    public void safeCount(){
        for (;;){
            int i = safeCount.get();
            boolean success = safeCount.compareAndSet(i,++i);
            if (success){
                break;
            }
        }
    }
    // 普通方式自增
    public void unsafeCount(){
        unsafe++;
    }
}

模拟并发

这里我们模拟使用 1000 个线程,执行 30 次来看下结果,包括总耗时和结果的正确性。

  • CAS 方式
public static int testSafe() throws InterruptedException {
    // 记录开始时间
    long start = System.currentTimeMillis();
    // 实例化一个 Counter 计数器对象
    Counter counter = new Counter();
    CountDownLatch countDownLatch = new CountDownLatch(testCounts);
    for (int i =0 ;i < testCounts;i++){
        new Thread(()->{
                // 调用 safeCount 方法
                counter. safeCount();
                countDownLatch.countDown();
        }).start();
    }
    countDownLatch.await();
    // 结束时间
    long end = System.currentTimeMillis();
    safeTotalCostTime += (end-start);
    return counter.safeCount.get();
}
  • 普通方式
public static int testUnSafe() throws InterruptedException {
    // 记录开始时间
    long start = System.currentTimeMillis();
    // 实例化一个 Counter 计数器对象
    Counter counter = new Counter();
    CountDownLatch countDownLatch = new CountDownLatch(testCounts);
    for (int i =0 ;i< testCounts;i++){
        new Thread(()->{
            // 调用 unsafeCount 方法
            counter.unsafeCount();
            countDownLatch.countDown();
        }).start();
    }
    countDownLatch.await();
    // 结束时间
    long end = System.currentTimeMillis();
    unsafeTotalCostTime += (end-start);
    return counter.unsafe;
}
  • main 方法
public static void main(String[] args) throws InterruptedException {
    // 执行 300 次
    for (int i =0 ;i< 300;i++){
        // 普通方式
        int unSafeResult = testUnSafe();
        // cas 方式
        int safeResult = testSafe();
        // 结果验证,若果正确就将成功次数增加
        if (unSafeResult == testCounts){
            totalUnSafeCount++;
        }
        // 同上
        if (safeResult == testCounts){
            totalSafeCount++;
        }
    }
    System.out.println("test count = " + testCounts);
    System.out.println("非安全计数器正确个数 = " + totalUnSafeCount);
    System.out.println("非安全计数器耗时 = " + unsafeTotalCostTime);
    System.out.println("安全计数器正确个数 = " + totalSafeCount);
    System.out.println("安全计数器耗时 = " + safeTotalCostTime);
}

我的机器信息如下:

  • MacBook Pro (Retina, 15-inch, Mid 2015)
  • 处理器:2.2 GHz Intel Core i7
  • 内存:16 GB 1600 MHz DDR3

下面是一些测试数据。

1000(线程数) * 300(次数)

测试结果如下:

test count = 1000
非安全计数器正确个数 = 300
非安全计数器耗时 = 27193
安全计数器正确个数 = 300
安全计数器耗时 = 26337

居然发现不使用 CAS 的方式居然比使用自旋 CAS 的耗时要高出将近 1s。另外一个意外的点,我尝试了好几次,不使用 CAS 的情况得到的结果正确率基本也是 4 个 9 以上的比率,极少数会出现计算结果错误的情况。

3000(线程数) * 30(次数)

测试结果如下:

test count = 3000
非安全计数器正确个数 = 30
非安全计数器耗时 = 7816
安全计数器正确个数 = 30
安全计数器耗时 = 8073

这里看到在耗时上已经很接近了。这里需要考虑另外一个可能影响的点是,因为 testUnSafe 是 testSafe 之前执行的,“JVM 和 机器本身热身” 影响耗时虽然很小,但是也存在一定的影响。

5000(线程数) * 30(次数)

测试结果如下:

test count = 5000
非安全计数器正确个数 = 30
非安全计数器耗时 = 23213
安全计数器正确个数 = 30
安全计数器耗时 = 14161

随着并发量的增加,这里奇怪的是,普通自增方式所消耗的时间要高于CAS方式消耗的时间将近 8-9s 。

当尝试 10000 次时,是的你没猜错,抛出了 OOM 。但是从执行的结果来看,并没有说随着并发量的增大,普通方式错误的概率会增加,也没有出现预想的 CAS 方式的耗时要比 普通模式耗时多。

由于测试样本数据比较单一,对于测试结果没法做结论,欢迎大家将各自机器的结果提供出来,以供参考。另外就是,最近看到很多面试的同学,如果有被问道这个问题,还是需要谨慎考虑下。关于是否“打脸”还是“被打脸”还需要更多的测试结果。

CAS 到底是怎么操作的

  • CPU 指令
  • Unsafe 类

2、ABA 问题的简单复现

网上关于 CAS 讨论另外一个点就是 CAS 中的 ABA 问题,相信大多数同学在面试时如果被问到 CAS ,那么 ABA 问题也会被问到,然后接着就是怎么避免这个问题,是的套路就是这么一环扣一环的。

我相信 90% 以上的开发人员在实际的工程中是没有遇到过这个问题的,即使遇到过,在特定的情况下也是不会影响到计算结果。但是既然这个问题会被反复提到,那就一定有它导致 bug 的场景,找了一个案例供大家参考:CAS下ABA问题及优化方案

这里先不去考虑怎么去规避这个问题,我们想怎么去通过简单的模拟先来复现这个 ABA 问题。其实这个也很简单,如果你对线程交叉、顺序执行了解的话。

如何实现多线程的交叉执行

这个点实际上也是一个在面试过程中很常见的一个基础问题,我在提供的代码中给了三种实现方式,有兴趣的同学可以拉代码看下。

下面以 lock 的方式来模拟下这个场景,代码如下:

public class ConditionAlternateTest{
    private static int count = 0;
    // 计数器
    public AtomicInteger safeCount = new AtomicInteger(0);
    // lock
    private Lock lock = new ReentrantLock();
    // condition 1/2/3 用于三个线程触发执行的条件
    Condition c1 = lock.newCondition();
    Condition c2 = lock.newCondition();
    Condition c3 = lock.newCondition();
    // 模拟并发执行
    CountDownLatch countDownLatch = new CountDownLatch(1);
    // 线程1 ,A 
    Thread t1 = new Thread(()-> {
        try {
            lock.lock();
            while (count % 3 != 0){
                c1.await();
            }
            safeCount.compareAndSet(0, 1);
            System.out.println("thread1:"+safeCount.get());
            count++;
            // 唤醒条件2
            c2.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    });
     // 线程2 ,B 
    Thread t2 = new Thread(()-> {
        try {
            lock.lock();
            while (count % 3 != 1){
                c2.await();
            }
            safeCount.compareAndSet(1, 0);
            System.out.println("thread2:"+safeCount.get());
            count++;
            // 唤醒条件3
            c3.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    });
    // 线程2 ,A
    Thread t3 = new Thread(()-> {
        try {
            lock.lock();
            while (count % 3 != 2){
                c3.await();
            }
            safeCount.compareAndSet(0, 1);
            System.out.println("thread3:"+safeCount.get());
            count++;
            // 唤醒条件1
            c1.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    });
    // 启动启动线程
    public void threadStart() {
        t3.start();
        t1.start();
        t2.start();
        countDownLatch.countDown();
    }

    public static void main(String[] args) throws InterruptedException {
        ConditionAlternateTest test = new ConditionAlternateTest();
        test.threadStart();
        test.countDownLatch.await();
    }
}

执行结果:

thread1:1
thread2:0
thread3:1

上面线程交叉的案例实际上并不是严格意义上的 ABA 问题的复现,这里仅是模拟下产生的一个最简单的过程。如果大家有好的案例,也可以分享一下。

ABA 问题解决

常见实践:“版本号”的比对,一个数据一个版本,版本变化,即使值相同,也不应该修改成功。

java 中提供了 AtomicStampedReference 这个类来解决这个 ABA 问题。 AtomicStampedReference 原子类是一个带有时间戳的对象引用,在每次修改后,AtomicStampedReference 不仅会设置新值而且还会记录更改的时间。当 AtomicStampedReference 设置对象值时,对象值以及时间戳都必须满足期望值才能写入成功,这也就解决了反复读写时,无法预知值是否已被修改的窘境。

实现代码这里就不贴了,基于前面的代码改造,下面贴一下运行结果:

thread1,第一次修改;值为=1
thread2,已经改回为原始值;值为=0
thread3,第二次修改;值为=1

3、只能保证一个共享变量的原子操作

当对一个共享变量执行操作时,我们可以使用 CAS 的方式来保证原子操作,但是对于对多个变量操作时,循环 CAS 就无法保证操作的原子性了,那么这种场景下,我们就需要使用加锁的方式来解决。