[Reactive]在SpringWebFlux使用缓存

3,481 阅读4分钟

Project Reactor中的Cache对象

虽然reactor-netty使用nio的方式读写数据源大大加快了程序的响应速度,但是无论使用什么方法,远程数据读写都无法追赶上本地缓存的速度。

Project Reactor也提供了Cache的接入手段。在加入以下依赖,即可使用缓存对象:

<!-- https://mvnrepository.com/artifact/io.projectreactor.addons/reactor-extra -->
<dependency>
    <groupId>io.projectreactor.addons</groupId>
    <artifactId>reactor-extra</artifactId>
    <version>3.3.4.RELEASE</version>
</dependency>
  1. CacheMono
  2. CacheFlux

从名字上可以看出,CacheMonoMono相对,代表[0..1]的对象,CacheFluxFlux相对,代表[0..N]的对象。

事实上,在Spring Cloud Gateway源码中,就使用到了CacheFlux,用于缓存路由信息。

public CachingRouteDefinitionLocator(RouteDefinitionLocator delegate) {
		this.delegate = delegate;
		routeDefinitions = CacheFlux.lookup(cache, "routeDefs", RouteDefinition.class)
				.onCacheMissResume(this.delegate::getRouteDefinitions);
	}

CacheFlux官方文档例子

官方一下两个例子,一个是Generic cache entry points,另一个是Map endpoints

Generic cache entry points

	 AtomicReference<Context> storeRef = new AtomicReference<>(Context.empty());
     Flux<Integer> cachedFlux = CacheFlux
                .lookup(k -> Mono.justOrEmpty(storeRef.get().getOrEmpty(k))
                                 .cast(Integer.class)
                                 .flatMap(max -> Flux.range(1, max)
                                                     .materialize()
                                                     .collectList()),
                                key)
                .onCacheMissResume(Flux.range(1, 10))
                .andWriteWith((k, sigs) -> Flux.fromIterable(sigs)
                                               .dematerialize()
                                               .last()
                                               .doOnNext(max -> storeRef.updateAndGet(ctx -> ctx.put(k, max)))
                                               .then());

Map endpoints

    String key = "myCategory";
    LoadingCache<String, Object> graphs = Caffeine
        .newBuilder()
        .maximumSize(10_000)
        .expireAfterWrite(5, TimeUnit.MINUTES)
        .refreshAfterWrite(1, TimeUnit.MINUTES)
        .build(key -> createExpensiveGraph(key));

    Flux<Integer> cachedMyCategory = CacheFlux
        .lookup(graphs.asMap(), key, Integer.class)
        .onCacheMissResume(repository.findAllByCategory(key));

我们以Map endpoints的例子说明。

在上面Map endpoints的例子上可以看到,使用了Caffeine缓存,这里也可以换成Guava Cache或者ConcurrentHashMap甚至Hashmap,使用Hashmap的时候,取决并发逻辑,尤其在写缓存的时候。

代码逻辑很简单,通过lookUp方法查找缓存,如果没有找到,通过onCacheMissResume方法查找数据源。

遇到的问题

在定义了LoadingCache,我希望使用refresh(key)的方式更新缓存,但是发生问题。下面是我使用的LoadingCache.build()方法定义的内容。

.build(new CacheLoader<String, Object>() {
	@Override
    public YourType load(String key) throws Exception {
    	return getYourTypeValue(key); // 同步代码
    }
})

但是在刷新缓存后,再使用CacheFlux则会报错:

Content of cache for key xxx cannot be cast to List<Signal>

说一个不成熟的结论

不能通过refresh()方法来刷新缓存

为什么会这样?我们接下来看一这一块的源码是如何写的。

源码解析

public static <KEY, VALUE> FluxCacheBuilderMapMiss<VALUE> lookup(Map<KEY, ? super List> cacheMap, KEY key, Class<VALUE> valueClass) {
	return otherSupplier ->
		Flux.defer(() -> {
			Object fromCache = cacheMap.get(key);
            if (fromCache == null) {
				return otherSupplier.get()
					.materialize()
					.collectList()
					.doOnNext(signals -> cacheMap.put(key, signals))
					.flatMapIterable(Function.identity())
					.dematerialize();
			} else if (fromCache instanceof List) {
				try {
					@SuppressWarnings("unchecked")
					List<Signal<VALUE>> fromCacheSignals = (List<Signal<VALUE>>) fromCache;
					return Flux.fromIterable(fromCacheSignals)
                    	.dematerialize();
				}
				catch (Throwable cause) {
					return Flux.error(new IllegalArgumentException("Content of cache for key " + key + " cannot be cast to List<Signal>", cause));
				}
			} else {
				return Flux.error(new IllegalArgumentException("Content of cache for key " + key + " is not a List"));
			}
		});
}

上面的逻辑比较简单,其中otherSupplier实际上指的是onCacheMissResume方法中的参数;Flux.defer顾名思义,延迟加载,当需要的时候才进行加载。

首先,代码从cacheMap中拿到key对应的缓存内容,接下来就是进行判断,如果没有,则通过otherSupplier去读取数据源。

if (fromCache == null) {
	return otherSupplier.get()
		.materialize()
		.collectList()
		.doOnNext(signals -> cacheMap.put(key, signals))
		.flatMapIterable(Function.identity())
		.dematerialize();
}

其中有三个细节,一个是materialize方法,将对应数据具象化。我们知道,在reactive中Flux和Mono各种操作符,只是对数据做操作的描述,而不是数据对象本身,我们不能将操作符做缓存。做一个比方,我们各种操作符就相当于管道,而数据通过管道做对应的操作,最后形成所需要的内容,而所需要的内容被subscibe()方法消费,而真正的被消费的对象(或者说数据)则是源头对象中的subscription

public final Flux<Signal<T>> materialize() {
	return onAssembly(new FluxMaterialize<>(this));
}

通过materialize我们将数据转换成Signal<T>,而这个Singal里则包含我们所需要消费的数据,即可作为消费的数据来源。

另外一个细节在doOnNext方法中,拿到signals对象后,将数据放回cacheMap中,所以我们无需在代码中显式的将数据插入缓存

.doOnNext(signals -> cacheMap.put(key, signals))

第三个,则是dematerialize(),显然该方法与materialize()相对,是将将Signal<T>转换成T,进而进行下一步的操作或者消费。

public final <X> Flux<X> dematerialize() {
	@SuppressWarnings("unchecked")
	Flux<Signal<X>> thiz = (Flux<Signal<X>>) this;
	return onAssembly(new FluxDematerialize<>(thiz));
}

从中我们发现Singal是一个容器,用于存放实际数据数据。

接下来,我们来看如果在cacheMap中能够拿到key对应的缓存内容。如果是拿到的缓存是List对象,则进行类型转换,将Signal<T>转换成T,即dematerialize()

else if (fromCache instanceof List) {
	try {
    	@SuppressWarnings("unchecked")
		List<Signal<VALUE>> fromCacheSignals = (List<Signal<VALUE>>) fromCache;
		return Flux.fromIterable(fromCacheSignals)
        	.dematerialize();
	}

看到这里我们就知道为什么通过直接缓存刷新之后,再使用缓存会报错?在刷新的代码中,直接将缓存类型T放入缓存,而不是Signal<T>,当再次使用缓存时候,自然不能做类型转换(List<Signal<VALUE>>) fromCache,于是报出类型转换错误。

.build(new CacheLoader<String, Object>() {
	@Override
    public YourType load(String key) throws Exception {
    	return getYourTypeValue(key); // 同步代码
    }
})

总结

1. cacheMap查询 -> 如果没有 -> 通过 onCacheMissResume中参数的方法查询数据 
			-> 通过materialize()方法将数据具象化,把T转换为Signal<T> 
            -> 写入缓存map -> 通过dematerialize()方法,反具象化,把Sinal<T>转T -> 交给下步使用

2. cacheMap查询 -> 如果有 -> 通过dematerialize()方法,反具象化,把Sinal<T>转T -> 交给下步使用

如何改进?(课后题 :D)

从上面的代码分析我们可以看到,缓存中存放的是Singal<T>,所以我们需要将T转换成Singal<T>放入缓存。

.build(new CacheLoader<String, Object>() {
	@Override
    public YourType load(String key) throws Exception {
    	return Signal.next(getYourTypeValue(key)); // 同步代码
    }
})

通过Signal.next(getYourTypeValue(key))这样的方法,放入缓存后是否可用,待各位读者自行验证。