Project Reactor中的Cache对象
虽然reactor-netty使用nio的方式读写数据源大大加快了程序的响应速度,但是无论使用什么方法,远程数据读写都无法追赶上本地缓存的速度。
Project Reactor也提供了Cache的接入手段。在加入以下依赖,即可使用缓存对象:
<!-- https://mvnrepository.com/artifact/io.projectreactor.addons/reactor-extra -->
<dependency>
<groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-extra</artifactId>
<version>3.3.4.RELEASE</version>
</dependency>
从名字上可以看出,CacheMono
和Mono
相对,代表[0..1]的对象,CacheFlux
和Flux
相对,代表[0..N]的对象。
事实上,在Spring Cloud Gateway源码中,就使用到了CacheFlux
,用于缓存路由信息。
public CachingRouteDefinitionLocator(RouteDefinitionLocator delegate) {
this.delegate = delegate;
routeDefinitions = CacheFlux.lookup(cache, "routeDefs", RouteDefinition.class)
.onCacheMissResume(this.delegate::getRouteDefinitions);
}
CacheFlux官方文档例子
官方一下两个例子,一个是Generic cache entry points
,另一个是Map endpoints
Generic cache entry points
AtomicReference<Context> storeRef = new AtomicReference<>(Context.empty());
Flux<Integer> cachedFlux = CacheFlux
.lookup(k -> Mono.justOrEmpty(storeRef.get().getOrEmpty(k))
.cast(Integer.class)
.flatMap(max -> Flux.range(1, max)
.materialize()
.collectList()),
key)
.onCacheMissResume(Flux.range(1, 10))
.andWriteWith((k, sigs) -> Flux.fromIterable(sigs)
.dematerialize()
.last()
.doOnNext(max -> storeRef.updateAndGet(ctx -> ctx.put(k, max)))
.then());
Map endpoints
String key = "myCategory";
LoadingCache<String, Object> graphs = Caffeine
.newBuilder()
.maximumSize(10_000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.refreshAfterWrite(1, TimeUnit.MINUTES)
.build(key -> createExpensiveGraph(key));
Flux<Integer> cachedMyCategory = CacheFlux
.lookup(graphs.asMap(), key, Integer.class)
.onCacheMissResume(repository.findAllByCategory(key));
我们以Map endpoints的例子说明。
在上面Map endpoints
的例子上可以看到,使用了Caffeine
缓存,这里也可以换成Guava Cache
或者ConcurrentHashMap
甚至Hashmap
,使用Hashmap
的时候,取决并发逻辑,尤其在写缓存的时候。
代码逻辑很简单,通过lookUp
方法查找缓存,如果没有找到,通过onCacheMissResume
方法查找数据源。
遇到的问题
在定义了LoadingCache,我希望使用refresh(key)
的方式更新缓存,但是发生问题。下面是我使用的LoadingCache.build()
方法定义的内容。
.build(new CacheLoader<String, Object>() {
@Override
public YourType load(String key) throws Exception {
return getYourTypeValue(key); // 同步代码
}
})
但是在刷新缓存后,再使用CacheFlux
则会报错:
Content of cache for key xxx cannot be cast to List<Signal>
说一个不成熟的结论:
不能通过refresh()
方法来刷新缓存
为什么会这样?我们接下来看一这一块的源码是如何写的。
源码解析
public static <KEY, VALUE> FluxCacheBuilderMapMiss<VALUE> lookup(Map<KEY, ? super List> cacheMap, KEY key, Class<VALUE> valueClass) {
return otherSupplier ->
Flux.defer(() -> {
Object fromCache = cacheMap.get(key);
if (fromCache == null) {
return otherSupplier.get()
.materialize()
.collectList()
.doOnNext(signals -> cacheMap.put(key, signals))
.flatMapIterable(Function.identity())
.dematerialize();
} else if (fromCache instanceof List) {
try {
@SuppressWarnings("unchecked")
List<Signal<VALUE>> fromCacheSignals = (List<Signal<VALUE>>) fromCache;
return Flux.fromIterable(fromCacheSignals)
.dematerialize();
}
catch (Throwable cause) {
return Flux.error(new IllegalArgumentException("Content of cache for key " + key + " cannot be cast to List<Signal>", cause));
}
} else {
return Flux.error(new IllegalArgumentException("Content of cache for key " + key + " is not a List"));
}
});
}
上面的逻辑比较简单,其中otherSupplier
实际上指的是onCacheMissResume
方法中的参数;Flux.defer
顾名思义,延迟加载,当需要的时候才进行加载。
首先,代码从cacheMap中拿到key对应的缓存内容,接下来就是进行判断,如果没有,则通过otherSupplier
去读取数据源。
if (fromCache == null) {
return otherSupplier.get()
.materialize()
.collectList()
.doOnNext(signals -> cacheMap.put(key, signals))
.flatMapIterable(Function.identity())
.dematerialize();
}
其中有三个细节,一个是materialize
方法,将对应数据具象化。我们知道,在reactive中Flux和Mono各种操作符,只是对数据做操作的描述,而不是数据对象本身,我们不能将操作符做缓存。做一个比方,我们各种操作符就相当于管道,而数据通过管道做对应的操作,最后形成所需要的内容,而所需要的内容被subscibe()
方法消费,而真正的被消费的对象(或者说数据)则是源头对象中的subscription
。
public final Flux<Signal<T>> materialize() {
return onAssembly(new FluxMaterialize<>(this));
}
通过materialize
我们将数据转换成Signal<T>
,而这个Singal
里则包含我们所需要消费的数据,即可作为消费的数据来源。
另外一个细节在doOnNext
方法中,拿到signals
对象后,将数据放回cacheMap中,所以我们无需在代码中显式的将数据插入缓存。
.doOnNext(signals -> cacheMap.put(key, signals))
第三个,则是dematerialize()
,显然该方法与materialize()
相对,是将将Signal<T>
转换成T
,进而进行下一步的操作或者消费。
public final <X> Flux<X> dematerialize() {
@SuppressWarnings("unchecked")
Flux<Signal<X>> thiz = (Flux<Signal<X>>) this;
return onAssembly(new FluxDematerialize<>(thiz));
}
从中我们发现Singal
是一个容器,用于存放实际数据数据。
接下来,我们来看如果在cacheMap中能够拿到key对应的缓存内容。如果是拿到的缓存是List对象,则进行类型转换,将Signal<T>
转换成T
,即dematerialize()
。
else if (fromCache instanceof List) {
try {
@SuppressWarnings("unchecked")
List<Signal<VALUE>> fromCacheSignals = (List<Signal<VALUE>>) fromCache;
return Flux.fromIterable(fromCacheSignals)
.dematerialize();
}
看到这里我们就知道为什么通过直接缓存刷新之后,再使用缓存会报错?在刷新的代码中,直接将缓存类型T
放入缓存,而不是Signal<T>
,当再次使用缓存时候,自然不能做类型转换(List<Signal<VALUE>>) fromCache
,于是报出类型转换错误。
.build(new CacheLoader<String, Object>() {
@Override
public YourType load(String key) throws Exception {
return getYourTypeValue(key); // 同步代码
}
})
总结
1. cacheMap查询 -> 如果没有 -> 通过 onCacheMissResume中参数的方法查询数据
-> 通过materialize()方法将数据具象化,把T转换为Signal<T>
-> 写入缓存map -> 通过dematerialize()方法,反具象化,把Sinal<T>转T -> 交给下步使用
2. cacheMap查询 -> 如果有 -> 通过dematerialize()方法,反具象化,把Sinal<T>转T -> 交给下步使用
如何改进?(课后题 :D)
从上面的代码分析我们可以看到,缓存中存放的是Singal<T>
,所以我们需要将T
转换成Singal<T>
放入缓存。
.build(new CacheLoader<String, Object>() {
@Override
public YourType load(String key) throws Exception {
return Signal.next(getYourTypeValue(key)); // 同步代码
}
})
通过Signal.next(getYourTypeValue(key))
这样的方法,放入缓存后是否可用,待各位读者自行验证。