Java并发5:ConcurrentHashMap

2,021 阅读9分钟

为什么要使用 ConcurrentHashMap

HashMap 是非线程安全的,put操作可能导致死循环。其解决方案有 HashTable 和 Collections.synchronizedMap(hashMap) 。这两种方案都是对读写加锁,独占式,效率比较低下。

HashMap 在并发执行put操作时会引起死循环,因为多线程导致 HashMap 的 Entry 链表形成环形数据结构,则 Entry 的 next 节点永远不为空,会死循环获取 Entry。

HashTable 使用 synchronized 来保证线程安全,但是在线程竞争激烈的情况下,效率非常低。其原因是所有访问该容器的线程都必须竞争一把锁。

针对上述问题,ConcurrentHashMap 使用锁分段技术,容器里有多把锁,每一把锁用于其中一部分数据,当多线程访问不同数据段的数据时,线程间就不会存在锁的竞争。

ConcurrentHashMap 实现(JDK 1.7)

在JDK 1.7中,ConcurrentHashMap 采用了 Segment 数组和 HashEntry 数组的方式进行实现。其中 Segment 是一种可重入锁(ReentrantLock),扮演锁的角色。而 HashEntry 则是用于存储键值对的数据。结构如下图所示:

一个 Segment 包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素。每个 Segment 守护一个 HashEntry 数组的元素。

初始化

初始化时,计算出 Segment 数组的大小 ssize 和每个 Segment 中 HashEntry 数组的大小 cap,并初始化 Segment 数组的第一个元素。其中 ssize 为2的幂次方,默认为16,cap 大小也是2的幂次方,最小值为2。最终结果根据初始化容量 initialCapacity 计算。

//计算segment数组长度
if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        //初始化segmentShift和SegmentMask
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        //计算每个Segment中HashEntry数组大小cap
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // 初始化segment数组和segment[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;

首先,初始化了 segments 数组,其长度 ssize 是通过 concurrencyLevel 计算得出的。需要保证ssize的长度是2的N次方,segments 数组的长度最大是65536。

然后初始化了 segmentShift 和 segmentMask 这两个全局变量,用于定位 segment 的散列算法。segmentShift 是用于散列运算的位数, segmentMask 是散列运算的掩码。

之后根据 initialCapaicity 和 loadfactor 这两个参数来计算每个 Segment 中 HashEntry 数组的大小 cap。

最后根据以上确定的参数,初始化了 segment 数组以及 segment[0]。

get操作

整个 get 操作过程都不需要加锁,因此非常高效。首先将 key 经过 Hash 之后定位到 Segment,然后再通过一个 Hash 定位到具体元素。不需要加锁是因为 get 方法将需要使用的共享变量都定义成 volatile 类型,因此能在线程之间保持可见性,在多线程同时读时能保证不会读到过期的值。

put操作

put 方法需要对共享变量进行写入操作,为了线程安全,必须加锁。 put 方法首先定位到 Segment,然后在 Segment 里进行插入操作。插入操作首先要判断是否需要对 Segment 里的 HashEntry 数组进行扩容,然后定位添加元素的位置,将其放入到 HashEntry 数组。

Segment 的扩容比 HashMap 更恰当,因为后者是插入元素后判断是否已经到达容量,如果到达了就扩容,但是可能扩容后没有插入,进行了无效的扩容。Segment 在扩容时,首先创建一个原来容量两倍的数组,然后将原数组里的元素进行再散列后插入到新的数组。同时为了高效, ConcurrentHashMap 不会对整个容器进行扩容,而是只对某个 segment 进行扩容。

size方法

每个 Segment 都有一个 volatile 修饰的全局变量 count ,求整个 ConcurrentHashMap 的 size 时很明显就是将所有的 count 累加即可。但是 volatile 修饰的变量却不能保证多线程的原子性,所有直接累加很容易出现并发问题。但是如果在调用 size 方法时锁住其余的操作,效率也很低。

ConcurrentHashMap 的做法是先尝试两次通过不加锁的方式进行计算,如果两次结果相同,说明结果正确。如果计算结果不同,则给每个 Segment 加锁,进行统计。

ConcurrentHashMap 实现(JDK 1.8)

在JDK 1.8中,改变了分段锁的思路,采用了 Node数组 + CAS + Synchronized 来保证并发安全。底层仍然采用了数组+链表+红黑树的存储结构。

Node

在JDK 1.8中,使用 Node 替换 HashEntry,两者作用相同。在 Node 中, val 和 next 两个变量都是 volatile 修饰的,保证了可见性。

使用 table 变量存放 Node 节点数组,默认为 null, 默认大小为16,且每次扩容时大小总是2的幂次方。在扩容时,使用 nextTable 存放新生成的数据,数组为 table 的两倍。

ForwardingNode 是一个特殊的 Node 节点,hash 值为-1,存储了 nextTable 的引用。只有table 发生扩容时,其发生作用,作为占位符放在 table 中表示当前节点为null或者已经被移动。

TreeNode

在 HashMap 中,其核心的数据结构是链表。而在 ConcurrentHashMap 中,如果链表的数据过长会转换为红黑树来处理。通过将链表的节点包装成 TreeNode 放在 TreeBin 中,然后经由 TreeBin 完成红黑树的转换。

TreeBin

TreeBin 不负责键值对的包装,用于在链表转换为红黑树时包装 TreeNode 节点,用来构建红黑树。

初始化:initTable()

在构造函数中,ConcurrentHashMap 仅仅设置了一些参数。当首次插入元素时,才通过 initTable() 方法进行了初始化。

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
        //有其他线程在初始化,挂起当前线程
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
        //获得了初始化的权利,使用CAS将sizeCtl设置为-1,表示本线程正在初始化
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            //进行初始化
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //下次扩容的大小,相当于0.75*n
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

该方法的关键为sizeCtl。

sizeCtl:控制标识符,用来控制table初始化和扩容操作的,在不同的地方有不同的用途,其值也不同,所代表的含义也不同:

  • 负数代表正在进行初始化或扩容操作
  • -1代表正在初始化
  • -N 表示有N-1个线程正在进行扩容操作
  • 正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小

sizeCtl 默认为0。如果该值小于0,表示有其他线程在初始化,需要暂停该线程。如果该线程获取了初始化的权利,先将其设置为-1。最后将 sizeCtl 设置为 0.75*n,表示扩容的阈值。

put操作

put操作的核心思想依然是根据 hash 计算节点插入在 table 的位置,如果为空,直接插入,否则插入到链表或树中。

首先计算hash值,然后进入循环中遍历table,尝试插入。

int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
    Node<K,V> f; int n, i, fh;
//详细代码接下来分别讲述
}

首先判断 table 是否为空,如果为空或者是 null,则先进行初始化操作。

if (tab == null || (n = tab.length) == 0)
                tab = initTable();

如果已经初始化过,且插入的位置没有节点,直接插入该节点。使用CAS尝试插入该节点。

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                        new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }

如果有线程在扩容,先帮助扩容。

//当前位置的hashcode等于-1,需要扩容
 else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);

如果都不满足,使用 synchronized 锁写入数据。根据数据结构的不同,如果是链表则插入尾部;如果是树节点,使用树的插入操作。

else {
V oldVal = null;
//对该节点进行加锁处理(hash值相同的链表的头节点)
synchronized (f) {
    if (tabAt(tab, i) == f) {
        //fh > 0 表示为链表,将该节点插入到链表尾部
        if (fh >= 0) {
            binCount = 1;
            for (Node<K,V> e = f;; ++binCount) {
                K ek;
                //hash 和 key 都一样,替换value
                if (e.hash == hash &&
                        ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                    oldVal = e.val;
                    //putIfAbsent()
                    if (!onlyIfAbsent)
                        e.val = value;
                    break;
                }
                Node<K,V> pred = e;
                //链表尾部  直接插入
                if ((e = e.next) == null) {
                    pred.next = new Node<K,V>(hash, key,
                            value, null);
                    break;
                }
            }
        }
        //树节点,按照树的插入操作进行插入
        else if (f instanceof TreeBin) {
            Node<K,V> p;
            binCount = 2;
            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                    value)) != null) {
                oldVal = p.val;
                if (!onlyIfAbsent)
                    p.val = value;
            }
        }
    }
}

在for循环的最后,判断链表的长度是否需要链表转换为树结构。

if (binCount != 0) {
    // 如果链表长度已经达到临界值8,把链表转换为树结构
    if (binCount >= TREEIFY_THRESHOLD)
        treeifyBin(tab, i);
    if (oldVal != null)
        return oldVal;
    break;
}

最后,如果是更新节点,前边已经返回了 oldVal,否则就是插入新的节点。还需要使用 addCount() 方法,为 size 加一。

总结步骤如下:

  1. 判断 key 和 value 是否为 null,如果是则抛出异常
  2. 计算 hash
  3. 遍历 table,插入节点
    • 如果 table 为空,进行初始化
    • 插入位置为空,直接插入,无需加锁
    • 如果是 ForwardingNode 节点,表示有其他线程正在扩容,帮助线程一起进行扩容。
    • 如果是链表结构,遍历链表,如果存在 key 则更新 value,否则插入到链表尾部;如果是 TreeBin 节点,按照红黑树的方法更新或增加节点。
    • 如果完成后发现链表长度大于设定的阈值,将其装换为红黑树
  4. 如果是更新,返回oddVal;如果是插入,使用 addCount() 方法,增加 size, 返回 null。

get方法

  1. 计算 hash 值
  2. 判断 table 是否为空,为空返回 null
  3. 根据 hash 获取 Node 节点,如果是该节点则返回 value
  4. 根据链表或红黑树查找到对应节点,返回 value
  5. 找不到则返回 null

参考资料