ConcurrentHashMap源码分析

401 阅读6分钟

今天来介绍大名鼎鼎的ConcurrentHashMap,众所周知,Java.Utils.Concurrent包出现后,就立马成为高并发的利器,而靠一己之力把此包写出来的Doug Lea,则更是高并发大神。此篇文章仅仅限于描述ConcurrentHashMap冰山一角,并不能对其全面剖析,如果有读者想要对并发进行更深入的理解与交流,推荐《Java并发编程的艺术》,笔者看完很有感悟。

此文还是从最简单也是最常用的get,put方法来进行剖析,进而逐步抽丝剥茧,分析此类的全貌。理解此文章需要读者有HashMap的基础,且建议读者在阅读此文章时,脑中一定要两个甚至多个线程的概念,切勿以单线程模型来思考。 从注释可以得知所有参数都不可以为null. 与HashMap不同

All arguments to all task methods must be non-null

ConcurrentHashMap.put()

/**
     * Maps the specified key to the specified value in this table.
     * Neither the key nor the value can be null.
     *
     * <p>The value can be retrieved by calling the {@code get} method
     * with a key that is equal to the original key.
     *
     * @param key key with which the specified value is to be associated
     * @param value value to be associated with the specified key
     * @return the previous value associated with {@code key}, or
     *         {@code null} if there was no mapping for {@code key}
     * @throws NullPointerException if the specified key or value is null
     */
    public V put(K key, V value) {
        return putVal(key, value, false);
    }

可以看到put函数只是简单调用了putVal这个函数,那么继续往下

/** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();//参数检测异常
        int hash = spread(key.hashCode()); //Rehash
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {//死循环
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();//第一次进来,初始化table
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//1 此位置上还没有元素插入,则利用cas锁,
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)//2 
                tab = helpTransfer(tab, f);
            else { // 3 
                V oldVal = null;
                synchronized (f) {//3.1
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {//3.2
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                        else if (f instanceof ReservationNode)
                            throw new IllegalStateException("Recursive update");
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

由上图源码可知,死循环+四个if else就是整个put函数的核心实现。四个if else分别对应如下功能

  • 如果table为空,则初始化table(可以理解为一个数组,其实此处初始化也别有洞天,因为要防止多个线程同时初始化,有兴趣的读者可以自己去研究一下,看看Doug Lea是用了什么方式,防止table被多次初始化)

  • 注释1处的,如果数组对应的索引位置处还没有元素,则利用casTabAt进行放置key,value 此处主要有两点需要注意: 1 tabAt是利用的unSafe类里的getObjectVolatile(),熟悉Volatile关键字的同学肯定知道,这是获取该对象在内存中最新的值。(即同时有多个线程修改此变量,则JVM happen-before原则能够帮助我们获取到最新的值)。该对象即是table对应索引处的node对象。

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

2 如果casTabAt成功,则put成功,break掉死循环。 casTabAt()也是利用了UnSafe类里的compareAndSwapObject函数,关于此类可以多说几句,此类是Java用来利用CAS锁机制而现实的一个接口类。(各位同学一定要弄明白,CAS锁其实不需要上层做任何操作,它的可靠性是由底层硬件指令来保证的,上层只是调用),返回一个boolean。即如果插入失败,则重试。为什么会插入失败,各位可以思考一下。

    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }


    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }
  • 3 注释2处,此处判断是否正处于转移中,如果要插入的位置,正在转移,也就是整个table正处于扩容阶段,则帮助其转移。

  • 4 注释3处,Put函数的核心。如果走到此分支,则证明

    • table已然初始化过。
    • table对应索引位置上已经有元素
    • table并没有扩容或者转移 所以,此时,我们可以进行插入,那如何确保插入操作的原子性呢?注意3.1处的synchronized 关键字,将f作为锁,保证原子性。下面又进行了一次判断,为什么在此处需要判断呢?大家想想单例模式的DCL 应该就能懂了。继续往下看,里面又有三个if else的判断,此处由于篇幅关系,仅分析第一个if(fh >= 0),剩下两个if else读者可自行分析(相对比较简单)。 注释3.2处可知,又是个死循环,此处的逻辑与HashMap的大概相同,如果找到key相同的元素,则替换其Value。如果没找到,则将其加入到链表的最后一位。跳出内层死循环,判断查询次数是否大于阈值,然后跳出外层死循环,返回。

到此整个ConcurrentHashMap的Put函数分析就结束了,是不是很简单呢?那是因为我们没有分析put函数里两个较为硬核的addCount()与helpTransfer()函数。

总结 ConcurrentHashMap put函数的精髓就在于利用CAS替换所在位置,与锁住链表表头(或者是红黑树的root节点),进行修改。如果有同学接触过数据库,则会联系到此实现类似于数据库的行级锁。其优点是,降低的锁的粒度,提高了并发的效率。其缺点 则是非绝对的线程安全。

“当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替进行,并且在主调
代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么称这个类是绝对的线程安全的。”

接下来继续看Get函数。

ConcurrentHashMap.get()

 /**
     * Returns the value to which the specified key is mapped,
     * or {@code null} if this map contains no mapping for the key.
     *
     * <p>More formally, if this map contains a mapping from a key
     * {@code k} to a value {@code v} such that {@code key.equals(k)},
     * then this method returns {@code v}; otherwise it returns
     * {@code null}.  (There can be at most one such mapping.)
     *
     * @throws NullPointerException if the specified key is null
     */
    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)//注释1
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

额,到这里感觉没什么可讲的,因为读模式的话基本不涉及到线程变量冲突的问题,所以大家也可以看到get函数跟hashmap的get函数相差并不大。 也是简单的条件判断,然后查询key对应的node是否存在。 稍微有点区别的就是注释1处了。