Java并发编程入门(二十)常见加锁场景和加锁工具

4,181 阅读9分钟

banner窄.png

铿然架构  |  作者  /  铿然一叶 这是铿然架构的第 65 篇原创文章

相关阅读:

Java并发编程(一)知识地图
Java并发编程(二)原子性
Java并发编程(三)可见性
Java并发编程(四)有序性
Java并发编程(五)创建线程方式概览
Java并发编程入门(六)synchronized用法
Java并发编程入门(七)轻松理解wait和notify以及使用场景
Java并发编程入门(八)线程生命周期
Java并发编程入门(九)死锁和死锁定位
Java并发编程入门(十)锁优化
Java并发编程入门(十一)限流场景和Spring限流器实现
Java并发编程入门(十二)生产者和消费者模式-代码模板
Java并发编程入门(十三)读写锁和缓存模板
Java并发编程入门(十四)CountDownLatch应用场景
Java并发编程入门(十五)CyclicBarrier应用场景
Java并发编程入门(十六)秒懂线程池差别
Java并发编程入门(十七)一图掌握线程常用类和接口
Java并发编程入门(十八)再论线程安全
Java并发编程入门(十九)异步任务调度工具CompleteFeature


1. 常见加锁场景和加锁工具

常见加锁场景和加锁工具如下:

场景读和读读和写写和写实现方式性能
1不互斥不互斥互斥多种方式,只需对写加锁则可
2不互斥互斥互斥ReentrantReadWriteLock或StampedLock
3互斥互斥互斥多种方式,对读和写均加锁

上述多种方式指的是有多种方式实现,只要能做到加锁则可,例如可以使用synchronized关键字来加锁,也可以使用ReentrantLock来实现加锁。

场景1只有写写互斥,说明能容忍读写不一致(通常是短暂的不一致),在java并发包中CopyOnWriteArrayList就是这样的实现。在开源连接池HikariPool工具中就使用了CopyOnWriteArrayList,可参考HikariPool源码(四)资源状态

场景2只有读读不互斥,这种场景为调用者不会消费掉读到的数据,仅仅是使用它,例子读取证件类型的数据,读取到数据只是为了用于选择某种证件类型,但不会消费掉它。

场景3在所有情况下均互斥,这种场景通常为读取到的数据会被消费掉,不允许下一个消费者重复读取,例如一个待处理任务队列,被多个工作线程读取并处理,A工作线程读取到后其他工作线程不能再读取。

场景1和场景2非常相似,读一旦发生后,在使用读到的数据过程中,都控制不了写,从这点看,效果是一样的。当希望读到的数据尽最大可能为最近一次写操作数据时,使用场景2的工具包更合适。(例如写操作时间稍长,读操作很频繁)

ReentrantReadWriteLock和StampedLock的区别是,StampedLock支持乐观读,性能会更高一些,但也会付出一些代价,否则只需保留一个就可以了。

2. 举例

2.1. ReentrantLock

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockDemo {
    private static int TOTAL_THREAD_NUM = 3;  // 总共线程数
    private static int WRITE_THREAD_NUM = 1;   // 写线程数
    private static int READ_THREAD_NUM = TOTAL_THREAD_NUM - WRITE_THREAD_NUM; // 读线程数

    public static void main(String[] args) {
        ReentrantLockCache reentrantLockCache = new ReentrantLockCache();
        ExecutorService executorService = Executors.newFixedThreadPool(TOTAL_THREAD_NUM);
        AtomicInteger value = new AtomicInteger();
        value.getAndIncrement();

        for (int i = 0; i < WRITE_THREAD_NUM; i++) {
            executorService.submit(() -> {
                while (true) {
                    int v = value.getAndIncrement();
                    reentrantLockCache.put(1001, v);
                    System.out.println(Thread.currentThread().getName() + "-写 value: " + v + " " + new Date());
                }
            });
        }

        for (int i = 0; i < READ_THREAD_NUM; i++) {
            executorService.submit(() -> {
                int callTimes = 0;
                while (true) {
                    System.out.println(Thread.currentThread().getName() + "-读 value: " + reentrantLockCache.get(1001) + " " + new Date());
                }
            });
        }
    }

    public static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
    }
}

class ReentrantLockCache {
    final Map<Integer, Integer> map = new HashMap<>();
    ReentrantLock readLock = new ReentrantLock();

    public void put(Integer key, Integer val) {
        try {
            readLock.lock(); // 写锁
            ReentrantLockDemo.sleep(1000);  // 短暂休眠,用于验证读写互斥
            map.put(key, val);
        } finally {
            readLock.unlock();  // 必须在finally块解锁
        }
    }

    public Integer get(Integer key) {
        try {
            readLock.lock();  // 读锁
            ReentrantLockDemo.sleep(1000);  // 短暂休眠,用于验证读写互斥
            return map.get(key);
        } finally {
            readLock.unlock();   // 必须在finally块解锁
        }
    }
}

输出结果:

pool-1-thread-1-写 value: 1 Sun Apr 19 23:32:07 GMT+08:00 2020
pool-1-thread-2-读 value: 1 Sun Apr 19 23:32:08 GMT+08:00 2020
pool-1-thread-3-读 value: 1 Sun Apr 19 23:32:09 GMT+08:00 2020
pool-1-thread-1-写 value: 2 Sun Apr 19 23:32:10 GMT+08:00 2020
pool-1-thread-2-读 value: 2 Sun Apr 19 23:32:11 GMT+08:00 2020
pool-1-thread-3-读 value: 2 Sun Apr 19 23:32:12 GMT+08:00 2020

可以看到ReentrantLock是读写互斥,读读互斥,写写互斥,如果不想对读加锁,那么取消读操作里的加锁操作则可,因此ReentrantLock可以用于场景1和场景3。

2.2. synchronized

class SynchronizedCache {
    final Map<Integer, Integer> map = new HashMap<>();

    // 这里仅用于举例,对于此场景,可以直接使用支持并发的map
    public synchronized void put(Integer key, Integer val) {
        map.put(key, val);
    }

    public synchronized Integer get(Integer key) {
        return map.get(key);
    }
}

在这里synchronized和ReentrantLock看起来功能相同,但synchronized不具备ReentrantLock的以下能力:

  1. 能够响应中断。synchronized持有锁A后,如果尝试获取锁B失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。
  2. 支持获取锁超时。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。
  3. 可以非阻塞地获取锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。

可见ReentrantLock具备synchronized的能力,又比它更加强大,但并不会因此就不能使用synchronized,对比上面的两个例子,在例子场景下,同样的功能,synchronized的代码更加简洁。

2.3. ReentrantReadWriteLock

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockDemo {
    private static int MAX_READ_TIMES = 2000;  // 最大读取次数
    private static int TOTAL_THREAD_NUM = 10;  // 总共线程数
    private static int WRITE_THREAD_NUM = 1;   // 写线程数
    private static int READ_THREAD_NUM = TOTAL_THREAD_NUM - WRITE_THREAD_NUM; // 读线程数
    private static boolean isStop = false;
    private static long startTime = System.currentTimeMillis();

    public static void main(String[] args) {
        ReadWriteLockCache readWriteLockCache = new ReadWriteLockCache();
        ExecutorService executorService = Executors.newFixedThreadPool(TOTAL_THREAD_NUM);
        AtomicInteger value = new AtomicInteger();
        value.getAndIncrement();

        for (int i = 0; i < WRITE_THREAD_NUM; i++) {
            executorService.submit(() -> {
                while (true) {
//                    sleep(20); // 模拟一段时间内没有写操作

                    int v = value.getAndIncrement();
                    readWriteLockCache.put(1001, v);
                    System.out.println(Thread.currentThread().getName() + "-写 value: " + v + " " + new Date());

                    if (isStop) {
                        break;
                    }
                }
            });
        }

        for (int i = 0; i < READ_THREAD_NUM; i++) {
            executorService.submit(() -> {
                int callTimes = 0;
                while (true) {
                    System.out.println(Thread.currentThread().getName() + "-读 value: " + readWriteLockCache.get(1001) + " " + new Date());

                    callTimes++;
                    if (callTimes == MAX_READ_TIMES) {
                        isStop = true;
                        System.out.println("cost time: " + (System.currentTimeMillis() - startTime));
                        readWriteLockCache.printTime();
                        break;
                    }
                }
            });
        }
    }

    public static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
    }
}

class ReadWriteLockCache {
    final Map<Integer, Integer> map = new HashMap<>();
    private AtomicInteger totalReadTimes = new AtomicInteger();
    private AtomicInteger totalWriteTimes = new AtomicInteger();
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    Lock readLock = readWriteLock.readLock();
    Lock writeLock = readWriteLock.writeLock();

    public void put(Integer key, Integer val) {
        try {
            writeLock.lock(); // 写锁
            ReadWriteLockDemo.sleep(5); // 模拟写耗时
            totalWriteTimes.getAndIncrement();
            map.put(key, val);
        } finally {
            writeLock.unlock();  // 必须在finally块解锁
        }
    }

    public Integer get(Integer key) {
        try {
            readLock.lock();  // 读锁
            totalReadTimes.getAndIncrement();
            return map.get(key);
        } finally {
            readLock.unlock();   // 必须在finally块解锁
        }
    }

    public void printTime() {
        System.out.println("Read times: " + totalReadTimes.get() + ", Write times: " + totalWriteTimes.get());
    }
}

输出结果:

pool-1-thread-10-读 value: 3233 Mon Apr 20 00:46:05 GMT+08:00 2020
pool-1-thread-10-读 value: 3233 Mon Apr 20 00:46:05 GMT+08:00 2020
cost time: 18447
Read times: 18000, Write times: 3233

可以看到读的时间相同,读和写时间不同,写和写时间不同,说明读读不互斥,读写互斥,写写互斥。

2.4. StampedLock

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.StampedLock;

public class StampedLockDemo {
    private static int MAX_READ_TIMES = 2000;  // 最大读取次数
    private static int TOTAL_THREAD_NUM = 10;  // 总共线程数
    private static int WRITE_THREAD_NUM = 1;   // 写线程数
    private static int READ_THREAD_NUM = TOTAL_THREAD_NUM - WRITE_THREAD_NUM; // 读线程数
    private static boolean isStop = false;
    private static long startTime = System.currentTimeMillis();


    public static void main(String[] args) {
        StampedLockCache stampedLockCache = new StampedLockCache();
        ExecutorService executorService = Executors.newFixedThreadPool(TOTAL_THREAD_NUM);
        AtomicInteger value = new AtomicInteger();
        value.getAndIncrement();

        for (int i = 0; i < WRITE_THREAD_NUM; i++) {
            executorService.submit(() -> {
                while (true) {
//                    sleep(20); // 模拟一段时间内没有写操作

                    int v = value.getAndIncrement();
                    stampedLockCache.put(1001, v);
                    System.out.println(Thread.currentThread().getName() + "-写 value: " + v + " " + new Date());

                    if (isStop) {
                        break;
                    }
                }
            });
        }

        for (int i = 0; i < READ_THREAD_NUM; i++) {  // 并发数多才能明显体现StampedLock的优势,少的话可能是劣势
            executorService.submit(() -> {
                int callTimes = 0;
                while (true) {
                    System.out.println(Thread.currentThread().getName() + "-读 value: " + stampedLockCache.get(1001) + " " + new Date());

                    callTimes++;
                    if (callTimes == MAX_READ_TIMES) {
                        isStop = true;
                        System.out.println("cost time: " + (System.currentTimeMillis() - startTime));
                        stampedLockCache.printTime();
                        break;
                    }
                }
            });
        }
    }

    public static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
    }
}

class StampedLockCache {
    final Map<Integer, Integer> map = new HashMap<>();
    StampedLock stampedLock = new StampedLock();
    private AtomicInteger totalReadTimes = new AtomicInteger();
    private AtomicInteger totalWriteTimes = new AtomicInteger();
    private AtomicInteger conflictTimes = new AtomicInteger();

    public void put(Integer key, Integer val) {
        long stamp = stampedLock.writeLock();
        try {
            StampedLockDemo.sleep(5);  // 模拟写耗时
            totalWriteTimes.getAndIncrement();
            map.put(key, val);
        } finally {
            stampedLock.unlock(stamp);  // 必须在finally块解锁
        }
    }

    public Integer get(Integer key) {
        long stamp = stampedLock.tryOptimisticRead();  // 乐观读
        Integer val = map.get(key);
        totalReadTimes.getAndIncrement();
        if (!stampedLock.validate(stamp)) {        // 判断读的过程中是否有写操作,有则重新读取
            conflictTimes.getAndIncrement();
            try {
                stamp = stampedLock.readLock();    // 升级为悲观读锁
                val = map.get(key);
            } finally {
                stampedLock.unlockRead(stamp);   // 必须在finally块解锁
            }
        }

        return val;
    }

    public void printTime() {
        System.out.println("Read times: " + totalReadTimes.get() + ", Write times: " + totalWriteTimes.get() +
                ", Conflict times: " + conflictTimes.get());
    }
}

输出结果:

pool-1-thread-8-读 value: 2140 Mon Apr 20 00:46:22 GMT+08:00 2020
pool-1-thread-8-读 value: 2140 Mon Apr 20 00:46:22 GMT+08:00 2020
cost time: 12264
Read times: 18000, Write times: 2140, Conflict times: 13747

可以看到StampedLock的性能比ReentrantReadWriteLock高,总共读取次数18000次,有13747次和写冲突,也即只有13747次需要对读操作加锁,而ReentrantReadWriteLock是每次都要对读操作加锁,额外的加锁操作增加了开销。

需要注意的是,虽然在这里StampedLock性能更高,但是StampedLock也有一些使用约束:

  1. 不支持重入
  2. 悲观读锁、写锁都不支持条件变量

另外将两个例子的总线程数均修改为3,输出结果如下:

//ReentrantReadWriteLockDemo
pool-1-thread-2-读 value: 1667 Mon Apr 20 00:53:13 GMT+08:00 2020
pool-1-thread-2-读 value: 1667 Mon Apr 20 00:53:13 GMT+08:00 2020
cost time: 9803
Read times: 4000, Write times: 1667

//StampedLockDemo
pool-1-thread-2-读 value: 1817 Mon Apr 20 00:53:32 GMT+08:00 2020
pool-1-thread-2-读 value: 1817 Mon Apr 20 00:53:32 GMT+08:00 2020
cost time: 10402
Read times: 4000, Write times: 1817, Conflict times: 3110

可见,StampedLock在高并发下才有性能优势。

3. 总结

  1. 不同的场景可以使用不同的加锁工具,使用何种工具,首先要确定使用场景。
  2. synchronized并没有完全被ReentrantLock替代,而是由使用场景决定,synchronized的优点是代码简洁。
  3. StampedLock的性能优势在高并发下才有效果,如果并发数小,则StampedLock的性能并不比ReentrantReadWriteLock高,同时,StampedLock有使用约束,需要注意。

.end


<--阅过留痕,左边点赞!