阅读 2342

给 Java 和 Android 构建一个简单的响应式Local Cache

夕阳.JPG

一. 为何要创建这个库

首先,Local Cache 不是类似于 Redis、Couchbase、Memcached 这样的分布式 Cache。Local Cache 适用于在单机环境下,对访问频率高、更新次数少的数据进行存放。因此,Local Cache 不适合存放大量的数据。

Local Cache 特别适合于 App,也适合在 Java 的某些场景下使用。

我们的 App 使用 Retrofit 作为网络框架,并且大量使用 RxJava,因此我考虑创建一个 RxCache 来缓存一些必要的数据。

RxCache 地址:github.com/fengzhizi71…

二. 如何构建 RxCache

2.1 RxCache 的基本方法

对于 Local Cache,最重要是需要有以下的这些方法:

<T> Record<T> get(String key, Type type);

<T> void save(String key, T value);

<T> void save(String key, T value, long expireTime);

boolean containsKey(String key);

Set<String> getAllKeys();

void remove(String key);

void clear();
复制代码

其中,有一个 save() 方法包含了失效时间的参数expireTime,这对于 Local Cache 是比较重要的一个方法,超过这个时间,这个数据将会失效。

既然是 RxCache,对于获取数据肯定需要类似这样的方法:

<T> Observable<Record<T>> load2Observable(final String key, final Type type) ;

<T> Flowable<Record<T>> load2Flowable(final String key, final Type type);

<T> Single<Record<T>> load2Single(final String key, final Type type);

<T> Maybe<Record<T>> load2Maybe(final String key, final Type type);
复制代码

也需要一些 Transformer 的方法,将 RxJava 的被观察者进行转换。在 RxCache 中,包含了一些默认的 Transformer 策略,特别是使用 Retrofit 和 RxJava 时,可以考虑结合这些策略来缓存数据。

以 CacheFirstStrategy 为例:

/**
 * 缓存优先的策略,缓存取不到时取接口的数据。
 * Created by tony on 2018/9/30.
 */
public class CacheFirstStrategy implements ObservableStrategy,
        FlowableStrategy,
        MaybeStrategy  {

    @Override
    public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {

        Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type);

        Flowable<Record<T>> remote = source
                .map(new Function<T, Record<T>>() {
                    @Override
                    public Record<T> apply(@NonNull T t) throws Exception {

                        rxCache.save(key, t);

                        return new Record<>(Source.CLOUD, key, t);
                    }
                });

        return cache.switchIfEmpty(remote);
    }

    @Override
    public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {

        Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type);

        Maybe<Record<T>> remote = source
                .map(new Function<T, Record<T>>() {
                    @Override
                    public Record<T> apply(@NonNull T t) throws Exception {

                        rxCache.save(key, t);

                        return new Record<>(Source.CLOUD, key, t);
                    }
                });

        return cache.switchIfEmpty(remote);
    }

    @Override
    public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {

        Observable<Record<T>> cache = rxCache.<T>load2Observable(key, type);

        Observable<Record<T>> remote = source
                .map(new Function<T, Record<T>>() {
                    @Override
                    public Record<T> apply(@NonNull T t) throws Exception {

                        rxCache.save(key, t);

                        return new Record<>(Source.CLOUD, key, t);
                    }
                });

        return cache.switchIfEmpty(remote);
    }
}
复制代码

2.2 Memory

RxCache 包含了两级缓存: Memory 和 Persistence 。

RxCache.png

Memory:

package com.safframework.rxcache.memory;

import com.safframework.rxcache.domain.Record;

import java.util.Set;

/**
 * Created by tony on 2018/9/29.
 */
public interface Memory {

    <T> Record<T> getIfPresent(String key);

    <T> void put(String key, T value);

    <T> void put(String key, T value, long expireTime);

    Set<String> keySet();

    boolean containsKey(String key);

    void evict(String key);

    void evictAll();
}
复制代码

它的默认实现 DefaultMemoryImpl 使用 ConcurrentHashMap 来缓存数据。

在 extra 模块还有 Guava Cache、Caffeine 的实现。它们都是成熟的 Local Cache,如果不想使用 DefaultMemoryImpl ,完全可以使用 extra 模块成熟的替代方案。

2.3 Persistence

Persistence 的接口跟 Memory 很类似:

package com.safframework.rxcache.persistence;

import com.safframework.rxcache.domain.Record;

import java.lang.reflect.Type;
import java.util.List;

/**
 * Created by tony on 2018/9/28.
 */
public interface Persistence {

    <T> Record<T> retrieve(String key, Type type);

    <T> void save(String key, T value);

    <T> void save(String key, T value, long expireTime);

    List<String> allKeys();

    boolean containsKey(String key);

    void evict(String key);

    void evictAll();
}
复制代码

由于,考虑到持久层可能包括 Disk、DB。于是单独抽象了一个 Disk 接口继承 Persistence。

在 Disk 的实现类 DiskImpl 中,它的构造方法注入了 Converter 接口:

public class DiskImpl implements Disk {

    private File cacheDirectory;
    private Converter converter;

    public DiskImpl(File cacheDirectory,Converter converter) {

        this.cacheDirectory = cacheDirectory;
        this.converter = converter;
    }

    ......
}
复制代码

Converter 接口用于对象储存到文件的序列化和反序列化,目前支持 Gson 和 FastJSON。

Converter 的抽象实现类 AbstractConverter 的构造方法注入了 Encryptor 接口:

public abstract class AbstractConverter implements Converter {

    private Encryptor encryptor;

    public AbstractConverter() {
    }

    public AbstractConverter(Encryptor encryptor) {

        this.encryptor = encryptor;
    }

    ......
}
复制代码

Encryptor 接口用于将存储到 Disk 上的数据进行加密和解密,目前 RxCache 支持 AES128 和 DES 两种加密方式。不使用 Encryptor 接口,则存储到 Disk 上的数据是明文,也就是一串json字符串。

三. 支持 Java

在 example 模块下,包括了一些常见 Java 使用的例子。

例如,最简单的使用:

import com.safframework.rxcache.RxCache;
import com.safframework.rxcache.domain.Record;
import domain.User;
import io.reactivex.Observable;
import io.reactivex.functions.Consumer;

/**
 * Created by tony on 2018/9/29.
 */
public class Test {

    public static void main(String[] args) {

        RxCache.config(new RxCache.Builder());

        RxCache rxCache = RxCache.getRxCache();

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test",u);

        Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);

        observable.subscribe(new Consumer<Record<User>>() {

            @Override
            public void accept(Record<User> record) throws Exception {

                User user = record.getData();
                System.out.println(user.name);
                System.out.println(user.password);
            }
        });
    }
}
复制代码

带 ExpireTime 的缓存测试:

import com.safframework.rxcache.RxCache;
import com.safframework.rxcache.domain.Record;
import domain.User;

/**
 * Created by tony on 2018/10/5.
 */
public class TestWithExpireTime {

    public static void main(String[] args) {

        RxCache.config(new RxCache.Builder());

        RxCache rxCache = RxCache.getRxCache();

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test",u,2000);

        try {
            Thread.sleep(2500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        Record<User> record = rxCache.get("test", User.class);

        if (record==null) {
            System.out.println("record is null");
        }
    }
}
复制代码

跟 Spring 整合并且 Memory 的实现使用 GuavaCacheImpl:

import com.safframework.rxcache.RxCache;
import com.safframework.rxcache.extra.memory.GuavaCacheImpl;
import com.safframework.rxcache.memory.Memory;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;

/**
 * Created by tony on 2018/10/5.
 */
@Configurable
public class ConfigWithGuava {

    @Bean
    public Memory guavaCache(){
        return new GuavaCacheImpl(100);
    }

    @Bean
    public RxCache.Builder rxCacheBuilder(){
        return new RxCache.Builder().memory(guavaCache());
    }

    @Bean
    public RxCache rxCache() {

        RxCache.config(rxCacheBuilder());

        return RxCache.getRxCache();
    }
}
复制代码

测试一下刚才的整合:

import com.safframework.rxcache.RxCache;
import com.safframework.rxcache.domain.Record;
import domain.User;
import io.reactivex.Observable;
import io.reactivex.functions.Consumer;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

/**
 * Created by tony on 2018/10/5.
 */
public class TestWithGuava {

    public static void main(String[] args) {

        ApplicationContext ctx = new AnnotationConfigApplicationContext(ConfigWithGuava.class);

        RxCache rxCache = ctx.getBean(RxCache.class);

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test",u);

        Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);

        observable.subscribe(new Consumer<Record<User>>() {
            @Override
            public void accept(Record<User> record) throws Exception {

                User user = record.getData();
                System.out.println(user.name);
                System.out.println(user.password);
            }
        });
    }
}
复制代码

四. 支持 Android

为了更好地支持 Android,我还单独创建了一个项目 RxCache4a: github.com/fengzhizi71…

它包含了一个基于 LruCache 的 Memory 实现,以及一个基于 MMKV(腾讯开源的key -value存储框架) 的 Persistence 实现。

我们目前 App 采用了如下的 MVVM 架构来传输数据:

MVVM.png

未来,希望能够通过 RxCache 来整合 Repository 这一层。

五. 总结

目前,RxCache 完成了大体的框架,初步可用,接下来打算增加一些 Annotation,方便其使用。

RxCache 系列的相关文章:

  1. ReentrantReadWriteLock读写锁及其在 RxCache 中的使用
  2. 堆外内存及其在 RxCache 中的使用
  3. Retrofit 风格的 RxCache及其多种缓存替换算法
  4. RxCache 整合 Android 的持久层框架 greenDAO、Room

Java与Android技术栈:每周更新推送原创技术文章,欢迎扫描下方的公众号二维码并关注,期待与您的共同成长和进步。

关注下面的标签,发现更多相似文章
评论