ConcurrentHashMap源码分析

1,015 阅读16分钟

设计目标

ConcurrentHashMap的设计目标是在保证并发可读性的前提下,最小化更新竞争。次要目标是使空间消耗与HashMap保持一致或者更少,并且保证空表的多线程初始化插入速率。

整体结构

ConcurrentHashMapHashMap结构相似,通常为一个桶哈希表(a binned (bucketed) hash table)。在内部维护一个数组用于保存Node节点,数组中的每一个位置代表一个桶位对应一个特定的hash值。当插入的数据出现hash冲突时,ConcurrentHashMap采用拉链法来解决hash冲突,在出现hash冲突的位置维护一个Node链表来保存这些数据。当链表上的数据过多时,查询当前位置的数据就需要遍历过多的元素会降低查询效率,所以当链表的长度过大时链表结构就会转为红黑树,以此来提高查询效率。

并发控制

ConcurrentHashMap通过cas操作与synchronized结合来保证线程安全。当更新的数据在table中的位置处为NULL时,通过Unsafe类执行cas操作来保存数据。当更新的位置处已经存在Node节点时,通过synchronized对该位置的第一个节点加锁保证线程安全,如上图在插入Node3的时候会首先锁住Node1节点然后再进行相关操作。

构造方法

/**
     * Creates a new, empty map with an initial table size based on
     * the given number of elements ({@code initialCapacity}), table
     * density ({@code loadFactor}), and number of concurrently
     * updating threads ({@code concurrencyLevel}).
     *
     * @param initialCapacity the initial capacity. The implementation
     * performs internal sizing to accommodate this many elements,
     * given the specified load factor.
     * @param loadFactor the load factor (table density) for
     * establishing the initial table size
     * @param concurrencyLevel the estimated number of concurrently
     * updating threads. The implementation may use this value as
     * a sizing hint.
     * @throws IllegalArgumentException if the initial capacity is
     * negative or the load factor or concurrencyLevel are
     * nonpositive
     */
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
        //校验参数
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        //根据并发度参数设置初始化最小值
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        //通过负载因子loadFactor计算容量。ConcurrentHashMap与HashMap不同这里虽然传了负载因子参数,但是只会在此方法处用来计算实际容量,并不会使用此负载因子来进行扩容,而是使用默认的负载因子0.75。
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        //tableSizeFor的作用是将上一步计算出的size转为大于等于size的最小的2的n次方。
        //然后将此值保存到sizeCtl中,可以看到在构造方法中并没有进行存储空间的实际初始化,只是对容量进行了计算,准备在初始化时使用。
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
}

get方法

/**
     * Returns the value to which the specified key is mapped,
     * or {@code null} if this map contains no mapping for the key.
     *
     * <p>More formally, if this map contains a mapping from a key
     * {@code k} to a value {@code v} such that {@code key.equals(k)},
     * then this method returns {@code v}; otherwise it returns
     * {@code null}.  (There can be at most one such mapping.)
     *
     * @throws NullPointerException if the specified key is null
     */
    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        //计算key的hash值
        int h = spread(key.hashCode());
        //table不为空并且hash值对应的位置不为null,获取当前位置的第一个Node节点
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            //此Node节点的hash值与key的hash值一致,比较key是否相同,相同返回此值
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            //第一个Node节点的hash值为负值,表示此节点正在进行扩容,这个Node节点是ForwardingNode对象,通过find方法去nextTable中去查找。
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            //遍历此位置的链表来查找映射的数据
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

put方法

/** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        //计算key的hash值
        int hash = spread(key.hashCode());
        //初始化binCount用来存储当前桶位的Node数,用于判断是否转为红黑树存储
        int binCount = 0;
        //自旋
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //如果当前table没有初始化,调用initTable方法进行初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //通过hash值定位,如果当前位置是null,就可以通过cas操作直接将值保存到此位置。
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            //如果当前位置存在Node节点,并且节点的hash值为-1表示正在扩容。
            else if ((fh = f.hash) == MOVED)
                //调用helpTransfer方法,尝试一起帮助扩容,避免无效等待
                tab = helpTransfer(tab, f);
            //当前位置存在Node节点,且没有进行扩容操作
            else {
                V oldVal = null;
                //尝试对当前节点加锁
                synchronized (f) {
                    //删除节点和扩容操作会出现加锁成功后节点的位置已经发生变化的情况,所以要再次判断确认
                    if (tabAt(tab, i) == f) {
                        //当前桶位的存储结构为链表,将此数据插入到链表中
                        if (fh >= 0) {
                            binCount = 1;
                            //遍历此链表,并同时记录此链表的节点数
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //此key已经有映射
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                     //如果是在不存在的时候才新增,则此数据不会更新;如果不是将此key的映射更新为传入的值。
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //当前桶位的存储结构为红黑树,将此数据插入到红黑树中
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                //如果节点数不为0,说明更新成功了;反之还在自旋
                if (binCount != 0) {
                    //当前位置的数据结构为链表,且节点数量到达了转为红黑树的阀值
                    if (binCount >= TREEIFY_THRESHOLD)
                        //转为红黑树存储
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //当前容量加1,如果达到了扩容阀值需要进行扩容
        addCount(1L, binCount);
        return null;
    }

计数原理与扩容分析

ConcurrentHashMap计数的方式与LongAdder类的计数方式一致,在没有竞争时更新baseCount字段,如果出现了多线程竞争,为了避免一全部阻塞在更新baseCount操作上,维护一个数组counterCells,用来存储多个数值,出现竞争时线程通过自己的探测值来确定自己更新counterCells数组中的哪一个值,由此来降低竞争。计算合计值时将baseCount与数组counterCells中的值相加即可。

/**
     * Adds to count, and if table is too small and not already
     * resizing, initiates transfer. If already resizing, helps
     * perform transfer if work is available.  Rechecks occupancy
     * after a transfer to see if another resize is already needed
     * because resizings are lagging additions.
     * 增加数量到count,如果count到达了扩容阀值并且没有开始扩容,初始化扩容操作。如果已经有其他线程在进行扩容了,就尝试帮助一起扩容。在扩容完成后,重新检查大小是否需要继续扩容。
     * @param x the count to add 添加的数量
     * @param check if <0, don't check resize, if <= 1 only check if uncontended
     */
    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        //如过CounterCell数组不为空,或者cas更新baseCount字段失败,则表示更新数量存在多线程竞争。
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            //如果CounterCell数组没有初始化,或者当前线程对应的CountCell为null
            if (as == null || (m = as.length - 1) < 0 ||
                //通过线程的探测值(可以理解为线程的hash值)来获取对应的CountCell,
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                //CounterCell数组已经初始化,并且线程对应的CountCell不为null,尝试cas更新此countCell的值,如果失败了执行fullAddCount方法
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        //check大于0时检查是否需要扩容
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            //如果总数量到达了扩容阀值,进行扩容
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                //计算调整大小为n的table的标志位   
                int rs = resizeStamp(n);
                //如果sizeCtl小于0表示目前正在进行扩容
                if (sc < 0) {
                    //判断扩容是否已经结束了,这个地方的判断在java8是有bug的,sc == rs + 1 和sc == rs + MAX_RESIZERS 这两个条件永远不可能为true。
                    //sizeCtl值的变化阶段:
                    //一:初始化扩容:sizeCtl = (rs << RESIZE_STAMP_SHIFT) + 2
                    //二:新增扩容线程帮助一起扩容 sizeCtl = sizeCtl + 1;
                    //三:一个线程完成了自己的扩容任务 sizeCtl = sizeCtl -1;
                    //四:全部线程都完成了自己的扩容后任务 sizeCtl = (rs << RESIZE_STAMP_SHIFT) + 1,此值为 初始化扩容值-1
                    //五:sizeCtl存储新的扩容阀值,变为正数
                    //当sizeCtl的值小于0时sizeCtl的值有这几个阶段
                    //一:第一个扩容线程刚开始扩容 sizeCtl=(rs << RESIZE_STAMP_SHIFT) + 2
                    //二:多个线程正在进行扩容 sizeCtl=(rs << RESIZE_STAMP_SHIFT) + 1 + 扩容线程个数
                    //所以此处的判断应该为
                    //一:扩容的线程都已经结束了扩容过程,此时 sizeCtl = (rs << RESIZE_STAMP_SHIFT) + 1;
                    //二:参与扩容的线程数已经达到了扩容线程数上限,sizeCtl = (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS
                    //这两种情况下当前线程都不必再参与扩容了,可以直接break。
                    再往下看如果nextTable==null或者transferIndex<=0也都表示扩容已经完成了,直接break
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    //帮助扩容,cas更新sizeCtl标记当前线程一起参与扩容    
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                //sizeCtl不小于0表示目前没有扩容
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

    //返回n容量所对应的一个独特的扩容标记
    //把n的信息转换为 前面有多少个0,用前面0的个数来区分不同的n
    //1 << (RESIZE_STAMP_BITS - 1),获取一个第16为1其余为全为0的数,保证了左移16位后符号位为1,即为一个负数
    //然后进行或运算,将n的信息加上去,获取标记数值用于扩容计算
    static final int resizeStamp(int n) {
        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }

transfer方法与helperTransfer方法

ConcurrentHashMap的扩容通过增加一个临时的数组,将原数组中的节点全部转移至新数组中完成扩容。扩容操作的转移的最小单位为一个处于table上的节点,每个线程发现自己操作的节点正在转移时都会试图参与一起扩容。通过transferIndex字段来控制,每个线程在扩容前先更新transferIndex,来标记自己处理的节点的范围,当transferIndex变为0时,意味着所有的节点都已经有线程在处理扩容转移或者已经处理完了。

/**
     * Moves and/or copies the nodes in each bin to new table. See
     * above for explanation.
     */
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        //nextTab为null表示扩容还未初始化
        if (nextTab == null) {            // initiating
            try {
                //创建是当前两倍容量的数组
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                //超过最大长度限制了,不能在继续扩容了
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            //待移动的数量为n
            transferIndex = n;
        }
        //新数组长度
        int nextn = nextTab.length;
        //创建转移节点
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        //遍历本线程分担的扩容范围的标记,如果为false说明当前分到的范围完成了,继续去获取任务,或者全部结束了
        boolean advance = true;
        //扩容完成的标记,用来在扩容全部完成之后再检查一遍所有节点
        //(这里我没想出来什么情况下,这个检查是有意义的:))
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            //继续进行当前范围的扩容任务
            while (advance) {
                int nextIndex, nextBound;
                // 扩容下标小于等于边界,或者已经标记为完成,不再继续循环
                if (--i >= bound || finishing)
                    advance = false;
                //(nextIndex = transferIndex) <= 0说明整体扩容已经完成了,将i=-1,用于下面判断结束
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                //当前范围扩容还在进行,cas更新nextIndex,标志本线程已经领取了此范围的扩容任务,调整当前的边界
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //如果i小于0了说明在上面的循环逻辑中transferIndex<=0,本线程的扩容任务已经完成,并且也没有未分配的扩容任务了
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                //标记为完成
                if (finishing) {
                    //table赋值为nextTab,nextTable变量清空
                    nextTable = null;
                    table = nextTab;
                    //扩容阀值变为当前的1.5倍
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                //如果标记为未完成,说明还有线程没有完成自己的分配任务,尝试将sizeCtl-1,标记当前线程已经完成。
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    //下面的判断表达式与addCount中的逻辑一致,如果为true说明所有线程都已经完成了任务,如果不等与说明还有未完成扩容的线程,本线程的扩容逻辑返回,等待其他线程完成。
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    //上面表达式成立,所有线程都已经完成了任务,将finishing标记改为true,重新遍历一遍所有节点,确保所有节点都已经完成了扩容。
                    finishing = advance = true;
                    //这里将i赋值为n,上面的while逻辑每次循环都会执行--i,所以i的值就是从n到0,然后执行下面的节点检查的逻辑。
                    i = n; // recheck before commit
                }
            }
            //本线程当前的扩容任务未完成
            //1.当前节点为null,cas更新节点的值为fwd转移节点,转移节点的hash值为MOVED
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            //2.判断是否已经在转移了(用于全部完成后的节点再次检查)
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                //3.节点还没有转移到nextTab,对其加锁执行转移操作
                //转移节点有一个重新计算下标的操作,由于nextTable的长度为tab的两倍,所以转移过去之后通过hash值定位的下标有可能会改变。
                //我们知道由于tab的长度length始终保持为2的n次方,所以下标是通过下面的公式计算的
                //int index = hash & (length-1);(就是hash对n的取模运算)
                //当length变为2*length时,int newIndex = hash & (length-1);
                //newIndex是否与index相等,我们分析一下位运算就可以得到以下结论
                //我们假设length二进制x位为1,当x等于6时二进制表示为00100000,
                //那么2*length二进制表示为01000000,length-1与2*length-1不同的地方就在于2*length计算出得结果第x位是1,而length第x位为0,所以hash对这两个结果进行与运算的区别就在于第x位,如果hash的x位为0,那么newIndex=index;如果hash的第x位为1,那么newIndex = index + 2^x;也就是newIndex = index + length;
                //因此在转移过程中,就会有一些节点的下标比原来大,差为length,也就是在数组的后半部分,我们把前半部分成为低位,后半部分称为高位。
                synchronized (f) {
                    //检查加锁完成后,当前位置的值是否变化了,如果变化了说明被其他线程修改了,回到上面的逻辑继续循环重试
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            //由上面的分析可知我们只需要找出hash的x位的值就可以辨别下标是否变化
                            //由于n = 2 ^ x,所以hash值与n进行与运算即为x位的值,如果为1则下标比原来多了n;
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            //遍历当前节点的链表,记录最后一个下标变化的节点
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            //反向遍历链表,这里的判断条件p != lastRun
                            //主要是为了尽量避免创建不必要的新Node对象,链表中有可以直接使用的链直接整体迁移至新表中,也就是上面首先正向遍历计算lastRun的目的。
                            //有上面的分析我们知道这里不是整个链表反向的,有可能会复用原来的链。由源代码开头的注释我们得治这种可能性还比较大。如下:
                            //On average, only about one-sixth of them need cloning when a table doubles.
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                //下标没有变化,节点放在低位
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    //下标变化了,节点放在高位
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            //cas更新低位节点链表
                            setTabAt(nextTab, i, ln);
                            //cas更新高位节点链表
                            setTabAt(nextTab, i + n, hn);
                            //原tab的位置设置转移节点标记此节点已被转移值nextTable
                            setTabAt(tab, i, fwd);
                            //继续当前扩容范围的下一个节点
                            advance = true;
                        }
                        //红黑树节点的迁移
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                //处理低位
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                //处理高位
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            //检查迁移过后的数量存储格式是否需要变为链表
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                //设置树的根节点
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                //设置树的根节点
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            //更新低位
                            setTabAt(nextTab, i, ln);
                            //更新高位
                            setTabAt(nextTab, i + n, hn);
                            //原tab的位置设置转移节点标记此节点已被转移值nextTable
                            setTabAt(tab, i, fwd);
                            //继续当前扩容范围的下一个节点
                            advance = true;
                        }
                    }
                }
            }
        }
    }
/**
 * 帮助扩容
 */
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    //判断当前扩容是否已经完成
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        //获取与table的length相关的扩容标记值
        int rs = resizeStamp(tab.length);
        //再次判断扩容正在进行
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            //如果扩容已经完成,返回
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            //cas更新扩容线程数,将自己加进去
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                //执行transfer与其他线程一起执行扩容操作
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}