阅读 109

MyBatis数据源模块分析

在使用 Mybatis 的时候,数据库的连接一般都会使用第三方的数据源组件,如 Druid、HikariCP、C3P0 等,其实 Mybatis 也有自己的数据源实现,可以连接数据库,还有连接池的功能

工厂模式

工厂顾名思义就是创建产品,根据产品是具体产品还是具体工厂可分为简单工厂模式和工厂方法模式,根据工厂的抽象程度可分为工厂方法模式和抽象工厂模式,MyBatis 数据源模块中为了数据源创建的便利性使用了工厂方法模式

工厂方法模式

  • 产品接口(AbStractProduct):产品接口用于定义产品类的功能,具体工厂类产生的所有产品都必须实现这个接口。调用者与产品接口直接交互,这是调用者最关心的接口
  • 具体产品类(Product):实现产品接口的实现类,具体产品类中定义了具体的业务逻辑
  • 工厂接口(AbstractFactory):工厂接口是工厂方法模式的核心接口,调用者会直接和工厂接口交互用于获取具体的产品实现类
  • 具体工厂类(ConcreteFactory): 是工厂接口的实现类,用于实例化产品对象,不同的具体工厂类会根据需求实例化不同的产品实现类

为什么要使用工厂模式?

  • 使用工厂模式将对象的创建和使用进行解耦,并屏蔽了创建对象可能的复杂过程
    • 把对象的创建和使用的过程分开,对象创建和对象使用使用的职责解耦
    • 如果创建对象的过程很复杂,创建过程统一到工厂里管理,既减少了重复代码,也方便以后对创建过程的修改维护
    • 当业务扩展时,只需要增加工厂子类,符合开闭原则

数据源的创建

MyBatis 使用了工厂模式来解决下面这几个关于数据源的问题

  1. 常见的数据源组件都实现了 javax.sql.DataSource 接口
  2. MyBatis 不但要能集成第三方的数据源组件,自身也提供了数据源的实现
  3. 一般情况下,数据源的初始化过程参数较多,比较复杂

相关类

几个相关类及部分重要代码注释

  • DataSource:数据源接口,JDBC 标准规范之一,定义了获取获取 Connection 的方法
  • DataSourceFactory:工厂接口,定义了创建 Datasource 的方法
public interface DataSourceFactory {

  /**
   * 设置DataSource的相关属性
   */
  void setProperties(Properties props);

  /**
   * 获取数据源
   */
  DataSource getDataSource();

}
复制代码

不带连接池的数据源

  • UnPooledDataSource:不带连接池的数据源,获取连接的方式和手动通过 JDBC 获取连接的方式是一样的,其中一部分代码
public class UnpooledDataSource implements DataSource {

  /**
   * 驱动类的类加载器
   */
  private ClassLoader driverClassLoader;
  /**
   * 数据库连接相关配置信息
   */
  private Properties driverProperties;
  /**
   * 缓存已注册的数据库驱动类
   */
  private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<>();

  private String driver;
  private String url;
  private String username;
  private String password;

  /**
   * 是否自动提交
   */
  private Boolean autoCommit;
  /**
   * 事务隔离级别
   */
  private Integer defaultTransactionIsolationLevel;
  /**
   * 默认网络超时时间
   */
  private Integer defaultNetworkTimeout;
  
  /**
   * 这个代码可以看出,unpooledDatasource获取连接的方式和手动获取连接的方式是一样的
   */
  private Connection doGetConnection(Properties properties) throws SQLException {
    // 初始化数据库驱动
    initializeDriver();
    Connection connection = DriverManager.getConnection(url, properties);
    // 设置事务是否自动提交,事务的隔离级别,还有默认超时时间
    configureConnection(connection);
    return connection;
  }
  
  // 省略部分代码。。
}
复制代码
  • UnpooledDataSourceFactory:工厂接口的实现类之一,用于创建 UnpooledDataSource(不带连接池的数据源)
public class UnpooledDataSourceFactory implements DataSourceFactory {

  @Override
  public void setProperties(Properties properties) {
    Properties driverProperties = new Properties();
    // 创建DataSource相应的metaObject,方便赋值
    MetaObject metaDataSource = SystemMetaObject.forObject(dataSource);
    // 遍历properties,将属性设置到DataSource中
    for (Object key : properties.keySet()) {
      String propertyName = (String) key;
      if (propertyName.startsWith(DRIVER_PROPERTY_PREFIX)) {
        String value = properties.getProperty(propertyName);
        driverProperties.setProperty(propertyName.substring(DRIVER_PROPERTY_PREFIX_LENGTH), value);
      } else if (metaDataSource.hasSetter(propertyName)) {
        String value = (String) properties.get(propertyName);
        Object convertedValue = convertValue(metaDataSource, propertyName, value);
        metaDataSource.setValue(propertyName, convertedValue);
      } else {
        throw new DataSourceException("Unknown DataSource property: " + propertyName);
      }
    }
    // 设置DataSource.driverProperties属性
    if (driverProperties.size() > 0) {
      metaDataSource.setValue("driverProperties", driverProperties);
    }
  }
  
  // 省略其他代码
}
复制代码

带连接池的数据源

  • PoolState:用于管理 PooledConnection 对象状态的组件,通过两个 list 分别管理空闲状态的连接资源和活跃状态的连接资源
public class PoolState {

  protected PooledDataSource dataSource;

  /**
   * 空闲的连接池资源集合
   */
  protected final List<PooledConnection> idleConnections = new ArrayList<>();
  /**
   * 活跃的连接池资源集合
   */
  protected final List<PooledConnection> activeConnections = new ArrayList<>();
  /**
   * 请求的次数
   */
  protected long requestCount = 0;
  /**
   * 累计的获得连接的时间
   */
  protected long accumulatedRequestTime = 0;
  /**
   * 累计的使用连接的时间。从连接取出到归还,算一次使用的时间
   */
  protected long accumulatedCheckoutTime = 0;
  /**
   * 使用连接超时的次数
   */
  protected long claimedOverdueConnectionCount = 0;
  /**
   * 累计超时时间
   */
  protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
  /**
   * 累计等待时间
   */
  protected long accumulatedWaitTime = 0;
  /**
   * 等待次数 
   */
  protected long hadToWaitCount = 0;
  /**
   * 无效的连接次数 
   */
  protected long badConnectionCount = 0;
 
  // 省略其他代码  
}
复制代码
  • PooledDataSource:带连接池的数据源,提高连接资源的复用性,避免频繁创建、关闭连接资源带来的开销
public class PooledDataSource implements DataSource {

  private static final Log log = LogFactory.getLog(PooledDataSource.class);

  private final PoolState state = new PoolState(this);

  private final UnpooledDataSource dataSource;

  // OPTIONAL CONFIGURATION FIELDS
  /**
   * 最大活跃连接数
   */
  protected int poolMaximumActiveConnections = 10;
  /**
   * 最大闲置连接数
   */
  protected int poolMaximumIdleConnections = 5;
  /**
   * 最大checkout时长(最长使用时间)
   */
  protected int poolMaximumCheckoutTime = 20000;
  /**
   * 无法取得连接是最大的等待时间
   */
  protected int poolTimeToWait = 20000;
  /**
   * 最多允许几次无效连接
   */
  protected int poolMaximumLocalBadConnectionTolerance = 3;
  /**
   * 测试连接是否有效的sql语句
   */
  protected String poolPingQuery = "NO PING QUERY SET";
  /**
   * 是否允许测试连接
   */
  protected boolean poolPingEnabled;
  /**
   * 配置一段时间,当连接在这段时间内没有被使用,才允许测试连接是否有效
   */
  protected int poolPingConnectionsNotUsedFor;
  /**
   * 根据数据库url、用户名、密码生成一个hash值,唯一标识一个连接池,由这个连接池生成的连接都会带上这个值
   */
  private int expectedConnectionTypeCode;

  /**
   * 回收连接资源
   */
  protected void pushConnection(PooledConnection conn) throws SQLException {

    // 加锁保证回收连接是同步的
    synchronized (state) {
      // 从活跃连接池中删除此连接
      state.activeConnections.remove(conn);
      if (conn.isValid()) {
        // 判断闲置连接池资源是否已经达到上限,没有达到上限则回收
        if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            // 如果还有事务没有提交,进行回滚操作
            conn.getRealConnection().rollback();
          }
          // 基于该连接,创建一个新的连接资源,并刷新连接状态,只是修改了连接状态,并未断开连接
          PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
          state.idleConnections.add(newConn);
          newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
          newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
          // 修改老连接状态为失效(false)
          conn.invalidate();
          if (log.isDebugEnabled()) {
            log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
          }
          // 唤醒其他被阻塞的线程
          state.notifyAll();
        } else {
          // 如果闲置连接池已经达到上限了,将连接真实关闭
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          // 关闭真的数据库连接,真实断开连接
          conn.getRealConnection().close();
          if (log.isDebugEnabled()) {
            log.debug("Closed connection " + conn.getRealHashCode() + ".");
          }
          // 将连接对象设置为无效
          conn.invalidate();
        }
      } else {
        // 连接无效,则将无效连接次数+1
        if (log.isDebugEnabled()) {
          log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
        }
        state.badConnectionCount++;
      }
    }
  }

  /**
   * 从连接池获取资源
   */
  private PooledConnection popConnection(String username, String password) throws SQLException {
    boolean countedWait = false;
    PooledConnection conn = null;
    // 记录尝试获取连接的起始时间戳
    long t = System.currentTimeMillis();
    // 初始化获取到无效连接的次数
    int localBadConnectionCount = 0;

    while (conn == null) {
      // 加锁保证获取连接的同步
      synchronized (state) {
        // 检测是否有空闲连接,如果有空闲连接则直接使用
        if (!state.idleConnections.isEmpty()) {
          // Pool has available connection
          conn = state.idleConnections.remove(0);
          if (log.isDebugEnabled()) {
            log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
          }
        } else {
          // Pool does not have available connection(没有空闲连接时)
          // 判断活跃连接池中的数量是否大于最大连接数,没有则创建新的连接
          if (state.activeConnections.size() < poolMaximumActiveConnections) {
            // Can create new connection
            conn = new PooledConnection(dataSource.getConnection(), this);
            if (log.isDebugEnabled()) {
              log.debug("Created connection " + conn.getRealHashCode() + ".");
            }
          } else {
            // Cannot create new connection(如果已经等于最大连接数,则不能创建新连接)
            // 获取最早创建的连接
            PooledConnection oldestActiveConnection = state.activeConnections.get(0);
            long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
            if (longestCheckoutTime > poolMaximumCheckoutTime) {
              // Can claim overdue connection(检测是否已经以及超过最长使用时间)
              // 如果超时,对超时连接的信息进行统计
              // 超时连接次数+1
              state.claimedOverdueConnectionCount++;
              // 累计超时时间增加
              state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
              // 累计的使用连接的时间增加
              state.accumulatedCheckoutTime += longestCheckoutTime;
              // 从活跃队列中删除
              state.activeConnections.remove(oldestActiveConnection);
              // 如果超时连接未提交,则手动回滚
              if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                try {
                  oldestActiveConnection.getRealConnection().rollback();
                } catch (SQLException e) {
                  /*
                     Just log a message for debug and continue to execute the following
                     statement like nothing happened.
                     Wrap the bad connection with a new PooledConnection, this will help
                     to not interrupt current executing thread and give current thread a
                     chance to join the next competition for another valid/good database
                     connection. At the end of this loop, bad {@link @conn} will be set as null.
                   */
                  log.debug("Bad connection. Could not roll back");
                }
              }
              // 在连接池中创建新的连接,基于该连接,创建一个新的连接资源,并刷新连接状态,只是修改了连接状态,并未断开连接
              conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
              conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
              conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
              // 让老连接失效
              oldestActiveConnection.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
              }
            } else {
              // Must wait
              // 如果没有空闲连接,最早的连接没有失效,无法创建新的连接,只能阻塞
              try {
                if (!countedWait) {
                  state.hadToWaitCount++;
                  countedWait = true;
                }
                if (log.isDebugEnabled()) {
                  log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                }
                long wt = System.currentTimeMillis();
                // 阻塞等待指定时间
                state.wait(poolTimeToWait);
                // 累计等待时间增加
                state.accumulatedWaitTime += System.currentTimeMillis() - wt;
              } catch (InterruptedException e) {
                break;
              }
            }
          }
        }
        // 获取连接成功的,要测试连接是否有效,同时更新统计数据
        if (conn != null) {
          // ping to server and check the connection is valid or not
          // 检测连接是否有效
          if (conn.isValid()) {
            if (!conn.getRealConnection().getAutoCommit()) {
              conn.getRealConnection().rollback();
            }
            // 连接池相关统计信息更新
            conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
            conn.setCheckoutTimestamp(System.currentTimeMillis());
            conn.setLastUsedTimestamp(System.currentTimeMillis());
            state.activeConnections.add(conn);
            state.requestCount++;
            state.accumulatedRequestTime += System.currentTimeMillis() - t;
          } else {
            // 如果连接无效
            if (log.isDebugEnabled()) {
              log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
            }
            // 累计的获取无效连接次数+1
            state.badConnectionCount++;
            // 当前获取无效连接次数+1
            localBadConnectionCount++;
            conn = null;
            // 拿到无效连接,但如果没有超过重试的次数,允许再次尝试获取连接,否则抛出异常
            if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
              if (log.isDebugEnabled()) {
                log.debug("PooledDataSource: Could not get a good connection to the database.");
              }
              throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
            }
          }
        }
      }

    }

    if (conn == null) {
      if (log.isDebugEnabled()) {
        log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
      }
      throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
    }

    return conn;
  }

}
复制代码
  • PooledConnection:使用动态代理封装了真正的数据库连接对象,在连接使用之前和关闭时进行增强
class PooledConnection implements InvocationHandler {

  /**
   * 记录当前连接所在的数据源对象,本次连接是有这个数据源创建的,关闭后也是回到这个数据源;
   */
  private final PooledDataSource dataSource;
  /**
   * 真正的连接对象
   */
  private final Connection realConnection;
  /**
   * 连接的代理对象
   */
  private final Connection proxyConnection;
  /**
   * 从数据源取出来连接的时间戳
   */
  private long checkoutTimestamp;
  /**
   * 连接创建的的时间戳
   */
  private long createdTimestamp;
  /**
   * 连接最后一次使用的时间戳
   */
  private long lastUsedTimestamp;
  /**
   * 根据数据库url、用户名、密码生成一个hash值,唯一标识一个连接池
   */
  private int connectionTypeCode;
  /**
   * 连接是否有效
   */
  private boolean valid;

  /**
   * Required for InvocationHandler implementation.
   * 此方法专门用来增强数据库connect对象,使用前检查连接是否有效,关闭时对连接进行回收
   * @param proxy
   *          - not used
   * @param method
   *          - the method to be executed
   * @param args
   *          - the parameters to be passed to the method
   * @see java.lang.reflect.InvocationHandler#invoke(Object, java.lang.reflect.Method, Object[])
   */
  @Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    // 如果是调用连接的close方法,不是真正的关闭,而是回收到连接池
    if (CLOSE.equals(methodName)) {
      // 通过pooled数据源来进行回收
      dataSource.pushConnection(this);
      return null;
    }
    try {
      // 使用前要检查当前连接是否有效
      if (!Object.class.equals(method.getDeclaringClass())) {
        // issue #579 toString() should never fail
        // throw an SQLException instead of a Runtime
        checkConnection();
      }
      return method.invoke(realConnection, args);
    } catch (Throwable t) {
      throw ExceptionUtil.unwrapThrowable(t);
    }

  }
  
  // 省略其他代码
}
复制代码
  • PooledDataSourceFactory:工厂接口的实现类之一,用于创建 PooledDataSource(带连接池的数据源)
public class PooledDataSourceFactory extends UnpooledDataSourceFactory {

  public PooledDataSourceFactory() {
    this.dataSource = new PooledDataSource();
  }

}
复制代码

总结一下连接池获取回收连接的过程

  • getConnection()

getConn

  • pushConnection()
    pushConn
关注下面的标签,发现更多相似文章
评论