Java并发编程-锁及并发容器

774

锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时访问共享资源(但是有些锁可以允许多个线程并发的访问共享资源,比如读写锁)。在Lock接口出现之前,Java程序是靠synchronized关键字实现锁功能的,而JavaSE5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,它提供了与synchronized关键字类似的同步功能,只是在使用时需要显式地获取和释放锁。虽然它缺少了(通过synchronized块或者方法所提供的)隐式获取释放锁的便捷性,但是却拥有了锁获取与释放的可操作性、可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性。

重入锁ReentrantLock

百度图片搜索
百度图片搜索

重入锁ReentrantLock,顾名思义,就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。除此之外,该锁的还支持获取锁时的公平和非公平性选择。ReentrantLock是java.unti.concurrent包下的一个类,它的一般使用结构如下所示:

public void lockMethod() {  
    ReentrantLock myLock = new ReentrantLock();  
    myLock.lock();  
    try{  
        // 受保护的代码段  
        //critical section  
    } finally {  
        // 可以保证发生异常 锁可以得到释放 避免死锁的发生  
        myLock.unlock();  
    }  
}

ReentrantLock与synchronized的比较

  • 相同:ReentrantLock提供了synchronized类似的功能和内存语义。
  • 不同:
  1. ReentrantLock功能性方面更全面,比如时间锁等候,可中断锁等候,锁投票等,因此更有扩展性。在多个条件变量和高度竞争锁的地方,用ReentrantLock更合适,ReentrantLock还提供了Condition,对线程的等待和唤醒等操作更加灵活,一个ReentrantLock可以有多个Condition实例,所以更有扩展性。
  2. ReentrantLock 的性能比synchronized会好点。
  3. ReentrantLock提供了可轮询的锁请求,他可以尝试的去取得锁,如果取得成功则继续处理,取得不成功,可以等下次运行的时候处理,所以不容易产生死锁,而synchronized则一旦进入锁请求要么成功,要么一直阻塞,所以更容易产生死锁。

公平性

在Java的ReentrantLock构造函数中提供了两种锁:创建公平锁和非公平锁(默认)。代码如下:

  public ReentrantLock() {

       sync = new NonfairSync();

}

 public ReentrantLock(boolean fair) {
          sync = fair ? new FairSync() : new NonfairSync();
    }

在公平的锁上,线程按照他们发出请求的顺序获取锁,但在非公平锁上,则允许‘插队’:当一个线程请求非公平锁时,如果在发出请求的同时该锁变成可用状态,那么这个线程会跳过队列中所有的等待线程而获得锁。

非公平锁性能高于公平锁性能的原因:
在恢复一个被挂起的线程与该线程真正运行之间存在着严重的延迟。

读写锁ReentrantReadWriteLock

之前提到锁基本都是排他锁,这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。

一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写
的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。Java并发包提供读写锁的实现是ReentrantReadWriteLock

百度图片搜索
百度图片搜索

public class Cache {
static Map<String, Object> map = new HashMap<String, Object>();
static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
static Lock r = rwl.readLock();
static Lock w = rwl.writeLock();
// 获取一个key对应的value
public static final Object get(String key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}
// 设置key对应的value,并返回旧的value
public static final Object put(String key, Object value) {
w.lock();
try {
return map.put(key, value);
} finally {
w.unlock();
}
}
// 清空所有的内容
public static final void clear() {
w.lock();
try {
map.clear();
} finally {
w.unlock();
}
}
}

Cache组合一个非线程安全的HashMap作为缓存的实现,同时使用读写锁的
读锁和写锁来保证Cache是线程安全的。在读操作get(String key)方法中,需要获取读锁,这使
得并发访问该方法时不会被阻塞。写操作put(String key,Object value)方法和clear()方法,在更新
HashMap时必须提前获取写锁,当获取写锁后,其他线程对于读锁和写锁的获取均被阻塞,而
只有写锁被释放之后,其他读写操作才能继续。

Condition接口

Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。

调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用

  • Conditon中的await()对应Object的wait()
  • Condition中的signal()对应Object的notify()
  • Condition中的signalAll()对应Object的notifyAll()
public class ConTest {  

     final Lock lock = new ReentrantLock();  
     final Condition condition = lock.newCondition();  

    public static void main(String[] args) {  
        // TODO Auto-generated method stub  
        ConTest test = new ConTest();  
        Producer producer = test.new Producer();  
        Consumer consumer = test.new Consumer();  


        consumer.start();   
        producer.start();  
    }  

     class Consumer extends Thread{  

            @Override  
            public void run() {  
                consume();  
            }  

            private void consume() {  

                    try {  
                           lock.lock();  
                        System.out.println("我在等一个新信号"+this.currentThread().getName());  
                        condition.await();  

                    } catch (InterruptedException e) {  
                        // TODO Auto-generated catch block  
                        e.printStackTrace();  
                    } finally{  
                        System.out.println("拿到一个信号"+this.currentThread().getName());  
                        lock.unlock();  
                    }  

            }  
        }  

     class Producer extends Thread{  

            @Override  
            public void run() {  
                produce();  
            }  

            private void produce() {                   
                    try {  
                           lock.lock();  
                           System.out.println("我拿到锁"+this.currentThread().getName());  
                            condition.signalAll();                             
                        System.out.println("我发出了一个信号:"+this.currentThread().getName());  
                    } finally{  
                        lock.unlock();  
                    }  
                }  
     }  

}

执行结果:

我在等一个新信号Thread-1
我拿到锁Thread-0
我发出了一个信号:Thread-0
拿到一个信号Thread-1

并发容器

CopyOnWrite容器

CopyOnWrite容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器

在使用CopyOnWriteArrayList之前,我们先阅读其源码了解下它是如何实现的。以下代码是向ArrayList里添加元素,可以发现在添加的时候是需要加锁的,否则多线程写的时候会Copy出N个副本出来。

public boolean add(T e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {

        Object[] elements = getArray();

        int len = elements.length;
        // 复制出新数组

        Object[] newElements = Arrays.copyOf(elements, len + 1);
        // 把新元素添加到新数组里

        newElements[len] = e;
        // 把原数组引用指向新数组

        setArray(newElements);

        return true;

    } finally {

        lock.unlock();

    }

}

final void setArray(Object[] a) {
    array = a;
}

读的时候不需要加锁,如果读的时候有多个线程正在向ArrayList添加数据,读还是会读到旧的数据,因为写的时候不会锁住旧的ArrayList。

public E get(int index) {
    return get(getArray(), index);
}

ConcurrentHashMap的实现原理与使用

ConcurrentHashMap是线程安全且高效的HashMap。ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构,一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。

为什么要使用ConcurrentHashMap

  • 线程不安全的HashMap

在多线程环境下,使用HashMap进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap

  • 效率低下的HashTable

HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同步方法时,会进入阻塞或轮询状态。如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法来获取元素,所以竞争越激烈效率越低

  • ConcurrentHashMap的锁分段技术可有效提升并发访问率

HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的线程都必须竞争同一把锁,假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术。首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问

hash定位

在定位元素的代码里我们可以发现,定位HashEntry和定位Segment的散列算法虽然一样,都与数组的长度减去1再相“与”,但是相“与”的值不一样,定位Segment使用的是元素的hashcode通过再散列后得到的值的高位,而定位HashEntry直接使用的是再散列后的值。其目的是避免两次散列后的值一样,虽然元素在Segment里散列开了,但是却没有在HashEntry里散列开

hash >>> segmentShift) & segmentMask  // 定位Segment所使用的hash算法
int index = hash & (tab.length - 1);  // 定位HashEntry所使用的hash算法

get

Segment的get操作实现非常简单和高效。先经过一次再散列,然后使用这个散列值通过散
列运算定位到Segment,再通过散列算法定位到元素,代码如下

public V get(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash);
}

get操作的高效之处在于整个get过程不需要加锁,除非读到的值是空才会加锁重读。我们
知道HashTable容器的get方法是需要加锁的,那么ConcurrentHashMap的get操作是如何做到不加锁的呢?原因是它的get方法里将要使用的共享变量都定义成volatile类型,如用于统计当前Segement大小的count字段和用于存储值的HashEntry的value。定义成volatile的变量,能够在线程之间保持可见性,能够被多线程同时读,并且保证不会读到过期的值

put

由于put方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须加锁。put方法首先定位到Segment,然后在Segment里进行插入操作。插入操作需要经历两个步骤,第一步判断是否需要对Segment里的HashEntry数组进行扩容,第二步定位添加元素的位置,然后将其放在HashEntry数组里

 public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

Segment的put方法

   final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

size

ConcurrentHashMap的做法是先尝试2次通过不锁住Segment的方式来统计各个Segment大小,如果统计的过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小。
那么ConcurrentHashMap是如何判断在统计的时候容器是否发生了变化呢?使用modCount变量,在put、remove和clean方法里操作元素前都会将变量modCount进行加1,那么在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化

阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。

  1. 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不
    满。
  2. 支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器

插入和移除操作的4中处理方式

方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(e,time,unit)
检查方法 element() peek() 不可用 不可用
  • 抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException("Queue
    full")异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。
  • 返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移
    除方法,则是从队列里取出一个元素,如果没有则返回null。
  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者
    线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队
    列会阻塞住消费者线程,直到队列不为空。
  • 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程
    一段时间,如果超过了指定的时间,生产者线程就会退出

Java里的阻塞队列

JDK 7提供了7个阻塞队列,如下。

  • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。

ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证线程公平的访问队列

  • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。

LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。

  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。

PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序

  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。

DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素

  • SynchronousQueue:一个不存储元素的阻塞队列。

SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素

  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。

LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。

  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法

参考资料

《Java并发编程的艺术》

ReentrantLock之公平锁与非公平锁浅析

java condition使用及分析

聊聊并发-Java中的Copy-On-Write容器

Java集合---ConcurrentHashMap原理分析