阅读 29

聊聊hibernate的session-level repeatable reads

本文主要研究一下hibernate的session-level repeatable reads

实例

doInTransaction(session -> {
    Product product = new Product();
    product.setId(1L);
    product.setQuantity(7L);
    session.persist(product);
});
doInTransaction(session -> {
    final Product product = (Product) session.get(Product.class, 1L);
    try {
        executeSync(() -> doInTransaction(_session -> {
            Product otherThreadProduct = (Product) _session.get(Product.class, 1L);
            assertNotSame(product, otherThreadProduct);
            otherThreadProduct.setQuantity(6L);
        }));
        Product reloadedProduct = (Product) session.createQuery("from Product").uniqueResult();
        assertEquals(7L, reloadedProduct.getQuantity());
        assertEquals(6L, 
            ((Number) session
            .createSQLQuery("select quantity from product where id = :id")
            .setParameter("id", product.getId())
            .uniqueResult())
            .longValue()
        );
    } catch (Exception e) {
        fail(e.getMessage());
    }
});
复制代码
  • 这段代码展示了hibernate的session-level repeatable reads功能,这里reloadedProduct查询返回的是session中id为1的entity的缓存(但是也向db发出了sql语句,只是没有使用其返回的resultSet的值),而project操作查询则直接根据jdbc查询返回的结果返回

实例代码来自How does Hibernate guarantee application-level repeatable reads

write-behind

write-behind cache

  • write-behind cache是cache策略的一种,其主要思路就是更新数据是首先更新cache,之后cache在批量持久化到存储中,比如批量更新到数据,这样做的好处是可以合并数据的多次操作减少IO

transactional write-behind cache

  • hibernate为了减少数据库连接加锁的时间,设计了transactional write-behind的策略,其persistence context充当transactional write-behind cache的角色,对entity的改动都先作用到内存,等到一定时机在flush到数据库;具体体现在Session类中

To reduce lock contention in the database, the physical database transaction needs to be as short as possible.

Long-running database transactions prevent your application from scaling to a highly-concurrent load. Do not hold a database transaction open during end-user-level work, but open it after the end-user-level work is finished. This concept is referred to as transactional write-behind.

The persistence context acts as a transactional write-behind cache, queuing any entity state change. Like any write-behind cache, changes are first applied in-memory and synchronized with the database during the flush time.

Session

hibernate的Session对jdbc的connection进行了包装,它主要是维护了level one cache,即"repeatable read" persistence context;具体体现在Loader的getRow方法中

Behind the scenes, the Hibernate Session wraps a JDBC java.sql.Connection and acts as a factory for org.hibernate.Transaction instances. It maintains a generally "repeatable read" persistence context (first level cache) of the application domain model.

The Hibernate Session acts as a transaction-scoped cache providing repeatable reads for lookup by identifier and queries that result in loading entities.

Loader.getRow

hibernate-core-5.3.9.Final-sources.jar!/org/hibernate/loader/Loader.java

	/**
	 * Resolve any IDs for currently loaded objects, duplications within the
	 * <tt>ResultSet</tt>, etc. Instantiate empty objects to be initialized from the
	 * <tt>ResultSet</tt>. Return an array of objects (a row of results) and an
	 * array of booleans (by side-effect) that determine whether the corresponding
	 * object should be initialized.
	 */
	private Object[] getRow(
			final ResultSet rs,
			final Loadable[] persisters,
			final EntityKey[] keys,
			final Object optionalObject,
			final EntityKey optionalObjectKey,
			final LockMode[] lockModes,
			final List hydratedObjects,
			final SharedSessionContractImplementor session) throws HibernateException, SQLException {
		final int cols = persisters.length;
		final EntityAliases[] descriptors = getEntityAliases();

		if ( LOG.isDebugEnabled() ) {
			LOG.debugf( "Result row: %s", StringHelper.toString( keys ) );
		}

		final Object[] rowResults = new Object[cols];

		for ( int i = 0; i < cols; i++ ) {

			Object object = null;
			EntityKey key = keys[i];

			if ( keys[i] == null ) {
				//do nothing
			}
			else {
				//If the object is already loaded, return the loaded one
				object = session.getEntityUsingInterceptor( key );
				if ( object != null ) {
					//its already loaded so don't need to hydrate it
					instanceAlreadyLoaded(
							rs,
							i,
							persisters[i],
							key,
							object,
							lockModes[i],
							session
					);
				}
				else {
					object = instanceNotYetLoaded(
							rs,
							i,
							persisters[i],
							descriptors[i].getRowIdAlias(),
							key,
							lockModes[i],
							optionalObjectKey,
							optionalObject,
							hydratedObjects,
							session
					);
				}
			}

			rowResults[i] = object;

		}

		return rowResults;
	}
复制代码
  • 在key不为null的情况下,该方法会设置object的值;这里首先通过session.getEntityUsingInterceptor方法根据key从session中寻找该entity,如果不为null,则执行instanceAlreadyLoaded,否则执行instanceNotYetLoaded去设置object

SessionImpl.getEntityUsingInterceptor

hibernate-core-5.3.9.Final-sources.jar!/org/hibernate/internal/SessionImpl.java

public final class SessionImpl
		extends AbstractSessionImpl
		implements EventSource, SessionImplementor, HibernateEntityManagerImplementor {

		//......

	@Override
	public Object getEntityUsingInterceptor(EntityKey key) throws HibernateException {
		checkOpenOrWaitingForAutoClose();
		// todo : should this get moved to PersistentContext?
		// logically, is PersistentContext the "thing" to which an interceptor gets attached?
		final Object result = persistenceContext.getEntity( key );
		if ( result == null ) {
			final Object newObject = getInterceptor().getEntity( key.getEntityName(), key.getIdentifier() );
			if ( newObject != null ) {
				lock( newObject, LockMode.NONE );
			}
			return newObject;
		}
		else {
			return result;
		}
	}

		//......
}
复制代码
  • getEntityUsingInterceptor方法首先从persistenceContext获取entity,如果获取不到再调用getInterceptor().getEntity获取;如果没有额外设置默认是EmptyInterceptor,其getEntity方法返回null

StatefulPersistenceContext

hibernate-core-5.3.9.Final-sources.jar!/org/hibernate/engine/internal/StatefulPersistenceContext.java

public class StatefulPersistenceContext implements PersistenceContext {
	//......

	// Loaded entity instances, by EntityKey
	private Map<EntityKey, Object> entitiesByKey;

	@Override
	public Object getEntity(EntityKey key) {
		return entitiesByKey.get( key );
	}

	@Override
	public void addEntity(EntityKey key, Object entity) {
		entitiesByKey.put( key, entity );
		if( batchFetchQueue != null ) {
			getBatchFetchQueue().removeBatchLoadableEntityKey(key);
		}
	}

	@Override
	public Object removeEntity(EntityKey key) {
		final Object entity = entitiesByKey.remove( key );
		final Iterator itr = entitiesByUniqueKey.values().iterator();
		while ( itr.hasNext() ) {
			if ( itr.next() == entity ) {
				itr.remove();
			}
		}
		// Clear all parent cache
		parentsByChild.clear();
		entitySnapshotsByKey.remove( key );
		nullifiableEntityKeys.remove( key );
		if( batchFetchQueue != null ) {
			getBatchFetchQueue().removeBatchLoadableEntityKey(key);
			getBatchFetchQueue().removeSubselect(key);
		}
		return entity;
	}

	@Override
	public void replaceDelayedEntityIdentityInsertKeys(EntityKey oldKey, Serializable generatedId) {
		final Object entity = entitiesByKey.remove( oldKey );
		final EntityEntry oldEntry = entityEntryContext.removeEntityEntry( entity );
		parentsByChild.clear();

		final EntityKey newKey = session.generateEntityKey( generatedId, oldEntry.getPersister() );
		addEntity( newKey, entity );
		addEntry(
				entity,
				oldEntry.getStatus(),
				oldEntry.getLoadedState(),
				oldEntry.getRowId(),
				generatedId,
				oldEntry.getVersion(),
				oldEntry.getLockMode(),
				oldEntry.isExistsInDatabase(),
				oldEntry.getPersister(),
				oldEntry.isBeingReplicated()
		);
	}

	//......
}
复制代码
  • StatefulPersistenceContext维护了一个entitiesByKey的map,getEntity方法直接根据EntityKey从该map取数据;它同时也提供了addEntity、removeEntity、replaceDelayedEntityIdentityInsertKeys等方法来修改map

instanceAlreadyLoaded

hibernate-core-5.3.9.Final-sources.jar!/org/hibernate/loader/Loader.java

public abstract class Loader {

	//......

	/**
	 * The entity instance is already in the session cache
	 */
	private void instanceAlreadyLoaded(
			final ResultSet rs,
			final int i,
			final Loadable persister,
			final EntityKey key,
			final Object object,
			final LockMode requestedLockMode,
			final SharedSessionContractImplementor session)
			throws HibernateException, SQLException {
		if ( !persister.isInstance( object ) ) {
			throw new WrongClassException(
					"loaded object was of wrong class " + object.getClass(),
					key.getIdentifier(),
					persister.getEntityName()
			);
		}

		if ( LockMode.NONE != requestedLockMode && upgradeLocks() ) { //no point doing this if NONE was requested
			final EntityEntry entry = session.getPersistenceContext().getEntry( object );
			if ( entry.getLockMode().lessThan( requestedLockMode ) ) {
				//we only check the version when _upgrading_ lock modes
				if ( persister.isVersioned() ) {
					checkVersion( i, persister, key.getIdentifier(), object, rs, session );
				}
				//we need to upgrade the lock mode to the mode requested
				entry.setLockMode( requestedLockMode );
			}
		}
	}

	//......
}
复制代码
  • instanceAlreadyLoaded方法主要是校验类型是否正确,同时根据lockMode信息判断是否要升级lock mode等

instanceNotYetLoaded

hibernate-core-5.3.9.Final-sources.jar!/org/hibernate/loader/Loader.java

public abstract class Loader {
	//......

	/**
	 * The entity instance is not in the session cache
	 */
	private Object instanceNotYetLoaded(
			final ResultSet rs,
			final int i,
			final Loadable persister,
			final String rowIdAlias,
			final EntityKey key,
			final LockMode lockMode,
			final EntityKey optionalObjectKey,
			final Object optionalObject,
			final List hydratedObjects,
			final SharedSessionContractImplementor session)
			throws HibernateException, SQLException {
		final String instanceClass = getInstanceClass(
				rs,
				i,
				persister,
				key.getIdentifier(),
				session
		);

		// see if the entity defines reference caching, and if so use the cached reference (if one).
		if ( session.getCacheMode().isGetEnabled() && persister.canUseReferenceCacheEntries() ) {
			final EntityDataAccess cache = persister.getCacheAccessStrategy();
			final Object ck = cache.generateCacheKey(
					key.getIdentifier(),
					persister,
					session.getFactory(),
					session.getTenantIdentifier()
					);
			final Object cachedEntry = CacheHelper.fromSharedCache( session, ck, cache );
			if ( cachedEntry != null ) {
				CacheEntry entry = (CacheEntry) persister.getCacheEntryStructure().destructure( cachedEntry, factory );
				return ( (ReferenceCacheEntryImpl) entry ).getReference();
			}
		}

		final Object object;
		if ( optionalObjectKey != null && key.equals( optionalObjectKey ) ) {
			//its the given optional object
			object = optionalObject;
		}
		else {
			// instantiate a new instance
			object = session.instantiate( instanceClass, key.getIdentifier() );
		}

		//need to hydrate it.

		// grab its state from the ResultSet and keep it in the Session
		// (but don't yet initialize the object itself)
		// note that we acquire LockMode.READ even if it was not requested
		LockMode acquiredLockMode = lockMode == LockMode.NONE ? LockMode.READ : lockMode;
		loadFromResultSet(
				rs,
				i,
				object,
				instanceClass,
				key,
				rowIdAlias,
				acquiredLockMode,
				persister,
				session
		);

		//materialize associations (and initialize the object) later
		hydratedObjects.add( object );

		return object;
	}

	/**
	 * Hydrate the state an object from the SQL <tt>ResultSet</tt>, into
	 * an array or "hydrated" values (do not resolve associations yet),
	 * and pass the hydrates state to the session.
	 */
	private void loadFromResultSet(
			final ResultSet rs,
			final int i,
			final Object object,
			final String instanceEntityName,
			final EntityKey key,
			final String rowIdAlias,
			final LockMode lockMode,
			final Loadable rootPersister,
			final SharedSessionContractImplementor session) throws SQLException, HibernateException {

		final Serializable id = key.getIdentifier();

		// Get the persister for the _subclass_
		final Loadable persister = (Loadable) getFactory().getEntityPersister( instanceEntityName );

		if ( LOG.isTraceEnabled() ) {
			LOG.tracef(
					"Initializing object from ResultSet: %s",
					MessageHelper.infoString(
							persister,
							id,
							getFactory()
					)
			);
		}

		boolean fetchAllPropertiesRequested = isEagerPropertyFetchEnabled( i );

		// add temp entry so that the next step is circular-reference
		// safe - only needed because some types don't take proper
		// advantage of two-phase-load (esp. components)
		TwoPhaseLoad.addUninitializedEntity(
				key,
				object,
				persister,
				lockMode,
				session
		);

		//This is not very nice (and quite slow):
		final String[][] cols = persister == rootPersister ?
				getEntityAliases()[i].getSuffixedPropertyAliases() :
				getEntityAliases()[i].getSuffixedPropertyAliases( persister );

		final Object[] values = persister.hydrate(
				rs,
				id,
				object,
				rootPersister,
				cols,
				fetchAllPropertiesRequested,
				session
		);

		final Object rowId = persister.hasRowId() ? rs.getObject( rowIdAlias ) : null;

		final AssociationType[] ownerAssociationTypes = getOwnerAssociationTypes();
		if ( ownerAssociationTypes != null && ownerAssociationTypes[i] != null ) {
			String ukName = ownerAssociationTypes[i].getRHSUniqueKeyPropertyName();
			if ( ukName != null ) {
				final int index = ( (UniqueKeyLoadable) persister ).getPropertyIndex( ukName );
				final Type type = persister.getPropertyTypes()[index];

				// polymorphism not really handled completely correctly,
				// perhaps...well, actually its ok, assuming that the
				// entity name used in the lookup is the same as the
				// the one used here, which it will be

				EntityUniqueKey euk = new EntityUniqueKey(
						rootPersister.getEntityName(), //polymorphism comment above
						ukName,
						type.semiResolve( values[index], session, object ),
						type,
						persister.getEntityMode(),
						session.getFactory()
				);
				session.getPersistenceContext().addEntity( euk, object );
			}
		}

		TwoPhaseLoad.postHydrate(
				persister,
				id,
				values,
				rowId,
				object,
				lockMode,
				session
		);

	}

	//......
}
复制代码
  • instanceNotYetLoaded方法主要hydrate object,它会调用loadFromResultSet方法从resultSet提取出对象,然后添加到hydratedObjects中再返回;loadFromResultSet方法主要是通过TwoPhaseLoad.addUninitializedEntity方法,调用了session.getPersistenceContext().addEntity,将该object添加到StatefulPersistenceContext中;之后使用persister.hydrate从resultSet提取values,最后通过TwoPhaseLoad.postHydrate方法来创建managedEntity并与object关联起来

initializeEntitiesAndCollections

hibernate-core-5.3.9.Final-sources.jar!/org/hibernate/loader/Loader.java

public abstract class Loader {
	//......

	private void initializeEntitiesAndCollections(
			final List hydratedObjects,
			final Object resultSetId,
			final SharedSessionContractImplementor session,
			final boolean readOnly,
			List<AfterLoadAction> afterLoadActions) throws HibernateException {

		final CollectionPersister[] collectionPersisters = getCollectionPersisters();
		if ( collectionPersisters != null ) {
			for ( CollectionPersister collectionPersister : collectionPersisters ) {
				if ( collectionPersister.isArray() ) {
					//for arrays, we should end the collection load before resolving
					//the entities, since the actual array instances are not instantiated
					//during loading
					//TODO: or we could do this polymorphically, and have two
					//      different operations implemented differently for arrays
					endCollectionLoad( resultSetId, session, collectionPersister );
				}
			}
		}

		//important: reuse the same event instances for performance!
		final PreLoadEvent pre;
		final PostLoadEvent post;
		if ( session.isEventSource() ) {
			pre = new PreLoadEvent( (EventSource) session );
			post = new PostLoadEvent( (EventSource) session );
		}
		else {
			pre = null;
			post = null;
		}

		if ( hydratedObjects != null ) {
			int hydratedObjectsSize = hydratedObjects.size();
			LOG.tracev( "Total objects hydrated: {0}", hydratedObjectsSize );
			for ( Object hydratedObject : hydratedObjects ) {
				TwoPhaseLoad.initializeEntity( hydratedObject, readOnly, session, pre );
			}
		}

		if ( collectionPersisters != null ) {
			for ( CollectionPersister collectionPersister : collectionPersisters ) {
				if ( !collectionPersister.isArray() ) {
					//for sets, we should end the collection load after resolving
					//the entities, since we might call hashCode() on the elements
					//TODO: or we could do this polymorphically, and have two
					//      different operations implemented differently for arrays
					endCollectionLoad( resultSetId, session, collectionPersister );
				}
			}
		}

		// Until this entire method is refactored w/ polymorphism, postLoad was
		// split off from initializeEntity.  It *must* occur after
		// endCollectionLoad to ensure the collection is in the
		// persistence context.
		if ( hydratedObjects != null ) {
			for ( Object hydratedObject : hydratedObjects ) {
				TwoPhaseLoad.postLoad( hydratedObject, session, post );
				if ( afterLoadActions != null ) {
					for ( AfterLoadAction afterLoadAction : afterLoadActions ) {
						final EntityEntry entityEntry = session.getPersistenceContext().getEntry( hydratedObject );
						if ( entityEntry == null ) {
							// big problem
							throw new HibernateException(
									"Could not locate EntityEntry immediately after two-phase load"
							);
						}
						afterLoadAction.afterLoad( session, hydratedObject, (Loadable) entityEntry.getPersister() );
					}
				}
			}
		}
	}

	//......
}
复制代码
  • initializeEntitiesAndCollections方法会调用TwoPhaseLoad.initializeEntity(该方法会调用persister.setPropertyValues( entity, hydratedState )来将hydratedState值填充到entity中)来初始化hydratedObject

小结

  • write-behind cache是cache策略的一种,其主要思路就是更新数据是首先更新cache,之后cache在批量持久化到存储中,比如批量更新到数据,这样做的好处是可以合并数据的多次操作减少IO
  • hibernate为了减少数据库连接加锁的时间,设计了transactional write-behind的策略,其persistence context充当transactional write-behind cache的角色,对entity的改动都先作用到内存,等到一定时机在flush到数据库;具体体现在Session类中
  • hibernate的Session对jdbc的connection进行了包装,它主要是维护了level one cache,即"repeatable read" persistence context;具体体现在Loader的getRow方法中
  • 在key不为null的情况下,该方法会设置object的值;这里首先通过session.getEntityUsingInterceptor方法根据key从session中寻找该entity,如果不为null,则执行instanceAlreadyLoaded,否则执行instanceNotYetLoaded去设置object
  • getEntityUsingInterceptor方法首先从persistenceContext获取entity,如果获取不到再调用getInterceptor().getEntity获取;如果没有额外设置默认是EmptyInterceptor,其getEntity方法返回null
  • StatefulPersistenceContext维护了一个entitiesByKey的map,getEntity方法直接根据EntityKey从该map取数据;它同时也提供了addEntity、removeEntity、replaceDelayedEntityIdentityInsertKeys等方法来修改map
  • instanceAlreadyLoaded方法主要是校验类型是否正确,同时根据lockMode信息判断是否要升级lock mode等;instanceNotYetLoaded方法主要hydrate object,它会调用loadFromResultSet方法从resultSet提取出对象,然后添加到hydratedObjects中再返回;loadFromResultSet方法主要是通过TwoPhaseLoad.addUninitializedEntity方法,调用了session.getPersistenceContext().addEntity,将该object添加到StatefulPersistenceContext中;之后使用persister.hydrate从resultSet提取values,最后通过TwoPhaseLoad.postHydrate方法来创建managedEntity并与object关联起来
  • initializeEntitiesAndCollections方法会调用TwoPhaseLoad.initializeEntity(该方法会调用persister.setPropertyValues( entity, hydratedState )来将hydratedState值填充到entity中)来初始化hydratedObject

doc

关注下面的标签,发现更多相似文章
评论