HikariCP源码阅读(二)ConcurrentBag与FastList

1,860 阅读8分钟

前言

本章学习HikariCP的两个容器类ConcurrentBagFastList。看看ConcurrentBag与传统阻塞队列相比的优势,比较FastList和JDK的ArrayList的异同,想一想为什么HikariCP需要用这两个特制的容器类。

一、IConcurrentBagEntry

IConcurrentBagEntryConcurrentBag容器里存放的元素。它有四个状态:

  • STATE_NOT_IN_USE:未使用。可以被借走
  • STATE_IN_USE:正在使用。
  • STATE_REMOVED:被移除,只有调用remove方法时会CAS改变为这个状态,修改成功后会从容器中被移除。
  • STATE_RESERVED:被保留,不能被使用。往往是移除前执行保留操作。
public interface IConcurrentBagEntry {
  int STATE_NOT_IN_USE = 0;
  int STATE_IN_USE = 1;
  int STATE_REMOVED = -1;
  int STATE_RESERVED = -2;
  boolean compareAndSet(int expectState, int newState);
  void setState(int newState);
  int getState();
}

二、ConcurrentBag的构造方法与成员变量

成员变量

public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseable {
   private final CopyOnWriteArrayList<T> sharedList;
   private final boolean weakThreadLocals;
   private final ThreadLocal<List<Object>> threadList;
   private final IBagStateListener listener;
   private final AtomicInteger waiters;
   private volatile boolean closed;
   private final SynchronousQueue<T> handoffQueue;
}
  • CopyOnWriteArrayList sharedList:保存容器内所有元素,使用CopyOnWriteArrayList,写操作加锁并复制底层数据,适用于读多写少的场景。HikariCP作者建议连接池最大连接数与最小连接数保持一致,这可能也是其中一个原因。如果连接池配置为弹性容量,遇到突发流量,sharedList扩张就导致CopyOnWriteArrayList加锁并做数组拷贝;流量过后,sharedList收缩也会导致加锁和数组拷贝。

  • boolean weakThreadLocals:对于threadList中的元素是否使用WeakReference保存,默认否。

  • ThreadLocal threadList:当前线程持有的元素。可以认为sharedList包含了所有的threadList里的元素。

  • IBagStateListener listener:通知外部往ConcurrentBag加入元素。

  • AtomicInteger waiters:当前等待获取元素的线程数量。

  • boolean closed:标记ConcurrentBag是否关闭,默认false。当关闭后ConcurrentBag无法添加新元素。

  • SynchronousQueue handoffQueue:交接队列。主要用到SynchronousQueue的两个方法offer(当没有线程获取走offer的元素时返回false)和poll(timeout,unit)(指定时间内没有获取到元素时返回null)。

构造方法

public ConcurrentBag(final IBagStateListener listener) {
  // IBagStateListener必须传入
  this.listener = listener;
  // threadList是否使用WeakReference保存元素
  this.weakThreadLocals = useWeakThreadLocals();
  // 交接队列,fair=true
  this.handoffQueue = new SynchronousQueue<>(true);
  // 等待线程数量
  this.waiters = new AtomicInteger();
  // 保存容器内所有元素
  this.sharedList = new CopyOnWriteArrayList<>();
  if (weakThreadLocals) {
  	 // 如果使用WeakReference,用ArrayList
     this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16));
  } else {
	 // 否则使用FastList,默认
     this.threadList = ThreadLocal.withInitial(() -> new FastList<>(IConcurrentBagEntry.class, 16));
  }

  private boolean useWeakThreadLocals() {
      try {
      	 // 如果系统变量(-D参数或环境变量)有配置com.zaxxer.hikari.useWeakReferences,走系统变量配置
      	 // 这个没有标注在文档里,因为一般不建议修改
         if (System.getProperty("com.zaxxer.hikari.useWeakReferences") != null) { of WeakReference behavior
            return Boolean.getBoolean("com.zaxxer.hikari.useWeakReferences");
         }
         // 如果当前类加载器和系统类加载器不一致,返回true
         return getClass().getClassLoader() != ClassLoader.getSystemClassLoader();
      } catch (SecurityException se) {
         return true;
      }
   }
}

三、add新增元素

Add a new object to the bag for others to borrow. 新增一个元素到bag里,供borrow方法借出。

private volatile boolean closed;
private final CopyOnWriteArrayList<T> sharedList;
private final AtomicInteger waiters;
private final SynchronousQueue<T> handoffQueue;
public void add(final T bagEntry) {
  // 如果容器关闭,抛出IllegalStateException
  if (closed) {
     throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
  }
  // 放入sharedList,此时其他线程已经可以获取这个元素了
  sharedList.add(bagEntry);
  // 持续尝试将元素放入交接队列
  while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) {
  	 // 当前线程主动放弃cpu执行,回到就绪状态
     Thread.yield();
  }
}

重点关注waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)这个循环条件。

  • waiters.get() > 0:需要有正在等待获取元素的线程,才会循环。
  • bagEntry.getState() == STATE_NOT_IN_USE:因为元素已经放入shareList了,可能被其他线程改变状态,需要判断当前元素仍然是未使用状态。
  • !handoffQueue.offer(bagEntry):尝试放入交接队列,如果失败继续循环。

四、borrow借出元素

The method will borrow a BagEntry from the bag, blocking for the specified timeout if none are available. 从bag中借出元素,如果没有可以获取的元素,会阻塞指定时长。

private final CopyOnWriteArrayList<T> sharedList;
private final ThreadLocal<List<Object>> threadList;
private final AtomicInteger waiters;
private final SynchronousQueue<T> handoffQueue;
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException {
  // 1. 从线程变量中获取
  final List<Object> list = threadList.get();
  for (int i = list.size() - 1; i >= 0; i--) {
  	 // 从ThreadLocal移除
     final Object entry = list.remove(i);
     final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
     // CAS修改元素状态为使用中
     // 因为元素可能被其他线程偷取,所以要cas修改状态
     if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
        return bagEntry;
     }
  }

  // 增加等待线程数量
  final int waiting = waiters.incrementAndGet();
  try {
  	 // 2. 从共享列表里获取
     for (T bagEntry : sharedList) {
     	// CAS修改元素状态为使用中
        if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
           // 如果不止有当前线程等待,可能偷取了别人的元素,通知外部放入元素
           if (waiting > 1) {
              listener.addBagItem(waiting - 1);
           }
           return bagEntry;
        }
     }
     // 通知外部添加元素
     listener.addBagItem(waiting);
     // 超时时间
     timeout = timeUnit.toNanos(timeout);
     do {
        final long start = currentTime();
        // 3. 尝试从交接队列获取元素
        final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
        // CAS修改元素状态为使用中
        if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
           return bagEntry;
        }
        // timeout -= 本次循环消耗时间
        timeout -= elapsedNanos(start);
     } while (timeout > 10000);

     return null;
  } finally {
     // 减少等待者数量
     waiters.decrementAndGet();
  }
}

从共享列表shareList里偷取元素之后,是否能省略调用listener.addBagItem(waiting - 1)

不能,这会导致其他线程获取元素失败。参考下图的ThreadA,ThreadA因为从shareList获取元素失败,通知Listener往ConcurrentBag放入元素,但是外部元素刚被放入shareList就被窃取了,导致ThreadA从交接队列获取元素失败,最终导致超时。

五、requite归还元素

This method will return a borrowed object to the bag. Objects that are borrowed from the bag but never "requited" will result in a memory leak. requite方法归还从bag借出的元素。如果借出的元素不做归还操作,会导致内存泄露。

public void requite(final T bagEntry) {
  // 1. 设置元素状态为未使用,设置完成后可能会被其他线程抢走
  bagEntry.setState(STATE_NOT_IN_USE);
  // 2. 如果有等待者,尝试放入交接队列
  for (int i = 0; waiters.get() > 0; i++) {
  	 // 再次判断元素状态,因为可能被其他线程抢走,如果不是未使用状态直接结束
  	 // 尝试放入交接队列,如果放入成功直接结束
     if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
        return;
     }
     // 如果循环了255次,把当前线程挂起一会
     else if ((i & 0xff) == 0xff) {
        parkNanos(MICROSECONDS.toNanos(10));
     }
     // 放弃cpu
     else {
        Thread.yield();
     }
  }
  // 3. 如果没有等待者或者循环了一圈没能放入交接队列,则放入ThreadLocal
  final List<Object> threadLocalList = threadList.get();
  if (threadLocalList.size() < 50) { // 最多能放50个
     threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
  }
}

六、reserve保留与unreserve取消保留

保留状态可以先让元素状态变为不能借取之后,做一些逻辑操作,最后调用remove方法真正移除。

public boolean reserve(final T bagEntry) {
  // cas修改状态为保留
  return bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_RESERVED);
}

public void unreserve(final T bagEntry) {
  // cas修改状态为未使用
  if (bagEntry.compareAndSet(STATE_RESERVED, STATE_NOT_IN_USE)) {
  	 // 如果有等待获取元素的线程,尝试放入交接队列
     while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {
        Thread.yield();
     }
  }
  else {
     LOGGER.warn("Attempt to relinquish an object to the bag that was not reserved: {}", bagEntry);
  }
}

七、remove删除元素

Remove a value from the bag. This method should only be called with objects obtained by borrow(long, TimeUnit) or reserve(T). 从bag中移除元素。只有在调用完borrow或reserve后才能调用。

public boolean remove(final T bagEntry) {
  // cas改变状态,前置状态只能是STATE_IN_USE或STATE_RESERVED
  if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
     LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
     return false;
  }
  // 先从sharedList移除
  final boolean removed = sharedList.remove(bagEntry);
  if (!removed && !closed) {
     LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
  }
  // 再从ThreadLocal里移除
  threadList.get().remove(bagEntry);

  return removed;
}

八、FastList

成员变量及构造方法

public final class FastList<T> implements List<T>, RandomAccess, Serializable {
	// 容器里元素的Class
	private final Class<?> clazz;
	// 数组
	private T[] elementData;
	// 容器中元素个数
	private int size;
	public FastList(Class<?> clazz, int capacity) {
      this.elementData = (T[]) Array.newInstance(clazz, capacity);
      this.clazz = clazz;
   }
}

add方法

首先看一下ArrayList的add方法。

public boolean add(E e) {
	// 确保数组容量
    ensureCapacityInternal(size + 1);
    // 放入元素
    elementData[size++] = e;
    return true;
}
private void ensureCapacityInternal(int minCapacity) {
    ensureExplicitCapacity(calculateCapacity(elementData, minCapacity));
}
private static int calculateCapacity(Object[] elementData, int minCapacity) {
    if (elementData == DEFAULTCAPACITY_EMPTY_ELEMENTDATA) {
        return Math.max(DEFAULT_CAPACITY, minCapacity);
    }
    return minCapacity;
}
private void ensureExplicitCapacity(int minCapacity) {
    modCount++;

    if (minCapacity - elementData.length > 0)
        grow(minCapacity);
}
private void grow(int minCapacity) {
    int oldCapacity = elementData.length;
    int newCapacity = oldCapacity + (oldCapacity >> 1);
    if (newCapacity - minCapacity < 0)
        newCapacity = minCapacity;
    if (newCapacity - MAX_ARRAY_SIZE > 0)
        newCapacity = hugeCapacity(minCapacity);
    elementData = Arrays.copyOf(elementData, newCapacity);
}

再来看一下FastList的add方法。

public boolean add(T element) {
  if (size < elementData.length) {
  	 // 如果元素个数没有超过数组长度,直接放入数组
     elementData[size++] = element;
  } else {
  	 // 否则两倍扩容,拷贝数组
     final int oldCapacity = elementData.length;
     final int newCapacity = oldCapacity << 1;
     final T[] newElementData = (T[]) Array.newInstance(clazz, newCapacity);
     System.arraycopy(elementData, 0, newElementData, 0, oldCapacity);
     newElementData[size++] = element;
     elementData = newElementData;
  }

  return true;
}

FastList和ArrayList的add方法区别:

  • ArrayList方法层级比FastList多,出入栈更频繁。
  • 由于ArrayList使用无参构造时,elementData数组变量是个空数组,需要在首次add时触发数组初始化,多了一些逻辑判断(对于Hikari来说,创建FastList和ArrayList时都传入了初始容量,这些逻辑判断都是无用的)。
  • FastList相比ArrayList去除了modCount的自增操作。
  • ArrayList扩容的计算逻辑相对复杂,考虑了很多边界条件。拷贝数组使用Arrays.copyOf方法,其底层也是调用System.arraycopy,但是调用栈很深。

remove方法

ArrayList的remove方法。

public boolean remove(Object o) {
    if (o == null) {
    	// 如果入参为空,循环通过==判断是否相等
        for (int index = 0; index < size; index++)
	        if (elementData[index] == null) {
	            fastRemove(index);
	            return true;
	        }
    } else {
        // 如果入参不为空,循环通过equals方法判断是否相等
        for (int index = 0; index < size; index++)
            if (o.equals(elementData[index])) {
                fastRemove(index);
                return true;
            }
    }
    return false;
}
 private void fastRemove(int index) {
    modCount++;
    // 计算需要移动的元素长度
    int numMoved = size - index - 1;
    // 如果移除的元素下标不是最后一位,需要往前移动下标之后的元素
    if (numMoved > 0)
        System.arraycopy(elementData, index+1, elementData, index, numMoved);
    // 最后一个元素清空
    elementData[--size] = null;
}

FastList的remove方法。

public boolean remove(Object element) {
  // 从后向前遍历
  for (int index = size - 1; index >= 0; index--) {
  	 // 只用==判断元素是否相等
     if (element == elementData[index]) {
     	// 下面这段逻辑与fastRemove一致
        final int numMoved = size - index - 1;
        if (numMoved > 0) {
           System.arraycopy(elementData, index + 1, elementData, index, numMoved);
        }
        elementData[--size] = null;
        return true;
     }
  }

  return false;
}

FastList和ArrayList的remove方法区别:

  • 遍历顺序不同。ArrayList从前向后,FastList从后向前。

为什么要从后向前遍历呢?

首先,我们希望最后一个元素匹配,因为如果最后一个元素直接匹配numMoved就是0,这样就不用做数组拷贝移位了。作者这样写的目的,一定是因为对于HikariCP,最后一个元素被移除的情况更多。

为什么对于HikariCP往往是后添加的元素优先被移除呢?

注意到borrow方法从threadList里尝试获取空闲Entry的时候,正是从FastList的最后一个元素开始remove。

为什么要从最后一个开始remove呢?

注意到requite归还连接方法作为唯一一个放入threadList元素的入口,他放入的元素都是未使用的。如果borrow方法配合着从最后一个元素开始尝试获取,那CAS修改状态的成功率是非常高的,因为之前归还的元素可能已经被别的线程从shareList窃取了!

  • 元素比较方式不同。ArrayList根据入参是否为空,分别使用==或equals判断;FastList只会使用==判断。这样基于内存地址直接比较,效率更高。

总结

HikariCP contains many micro-optimizations that individually are barely measurable, but together combine as a boost to overall performance. Some of these optimizations are measured in fractions of a millisecond amortized over millions of invocations. HikariCP包含许多微优化,这些优化几乎无法单独衡量,但结合起来可以提高整体性能。这些优化中的一些优化以毫秒为单位的几百万分摊。

  • 学习ConcurrentBag,这个容器类非常重要,是HikariCP池化资源实现的关键。
    • 四个状态的转换。
    • 几个关键方法。
    • 相比传统阻塞队列,ConcurrentBag使用了ThreadLocal来减少锁的概率。
  • 对比FastList和JDK的ArrayList的主要区别。

接下来进入HikariDataSource的学习。

原创不易,欢迎评论、点赞和关注。欢迎关注公众号:程序猿阿越。