RxJava练武场之——基于Observable网络框架的搭建

2,181 阅读9分钟

RxJava练武场是一个rxjava在项目中应用的小系列,包括:

Observable网络框架设计的背景和原则

Observable网络框架建立的原因

两个问题:

  • Retrofit已经对网络请求做了封装,为什么还要封装?

    答:网络请求中对于请求流程、配置、入参封装、加解密、异常处理每个app都是固定不变的,如果业务每次请求都自己处理,会存在冗余代码,且质量不易保证。所以我们需要基于Retrofit对这些流程和操作做二次封装,并对调用方式进行统一,我们称之为网络框架。

  • 框架封装Observable方式是什么?

    答:传统网络框架封装方式采用callback/listener异步回调网络请求结果。但是这种callback的方式,没有利用到Retrofit的一大优势--rxjava调用,所以我们要基于rxjava调用方式,封装一个基于返回Observable的网络请求框架。 以下所说网络框架,均指基于返回Observable的网络请求二次封装框架。

Observable网络框架设计目标

  • T1:业务对Request的业务params、url可轻松配置,框架对params组合和加密统一处理
  • T2:框架对返回Response解密、code判断等统一处理,并直接返回业务JavaBean结果
  • T3:返回值Observable,使网络请求可作为rxjava调用链中的一环
  • T4:调用入口统一、封装流程,对业务透明
  • T5:请求支持cancle机制、progressBar等通用处理

Observable网络框架设计原则

设计原则:

  1. 网络框架Api返回Observable对象,并作为网络请求事件的生产者:

    生产者负责请求的发起接收,和返回数据的预处理。

  2. 业务注册Observer类,作为消费者。

    只负责对返回数据的响应

这是为了T3的设计目标,生产者和消费者界限明确,做到完全解耦,为网络请求与其他rxjava异步调用的整合打基础。

这篇文章详细讲述框架中生产者的实现,消费者的实现在RxJava练武场之——Observable网络框架的解耦和复用中介绍


Observable网络框架如何实现

在Observable的创建过程中,框架如何封装?

首先我们需要一个Manager或Helper全局句柄,通过他可以发起网络请求,一般设计为单例全局持有,有利于网络请求一些资源的共用。 我们暂定为NetHelper,其网络请求Api定义为:

private static <R extends HttpResponse<?>> Observable sendRequest(final HttpRequest<R> orgRequest) 

sendRequest方法中,返回Observable对象,通过泛型规范Response的类型(T2目标的基础),HttpRequest(T1目标的基础)

第一步 HttpRequest定义

定义Request接口,这个接口是retrofit指定的方式。Retrofit本来希望你这么做的:

public interface LoginRequest {

    @POST("{/get/login}")
    Observable<LoginInfo> login(@FieldMap Map<String, String> params);
}

在LoginRequest中你定义了url,Response JavaBean, 入参params的方式(map) 通过 LoginRequest loginRequest = retrofit.create(LoginRequest.class); 动态代理的方式帮你把request的配置都生成好了。

但是map的传入这部分,需要业务自行处理入参的组合和加密,当然业务可以通过工具类的方式来规范这一步骤(网上多是此种方案),但这样会让业务参与到请求流程来了,不符合T4的目标。

我们希望业务定义LoginRequest如下:

public class LoginRequest extends HttpRequest<R extends HttpResponse> {

    private String userAccount;
    private String userPwd;
    private int accountType;

    @Override
    protected String getURL() {
        return "/get/login";
    }
}

业务只关心入参和url的定义,不需要关心入参的组合和加密方式(T1目标),通过NetHelper.send(loginRequest)一个调用就能拿到结果。这种定义方式不仅和Retrofit原生方式一样将url和Response类型都定义了,还一起将params也定义了进来。

所以我们不直接使用Retrofit的接口定义方式,封装了一个专为业务定义的HttpRequest类,最终也由这个类向Retrofit接口传递数据。

要满足上述调用方式,满足两个前提条件

  1. HttRequest基类,要处理入参的组合和加密处理
  2. Retrofit对应的Request定义接口,要改为通用的方式,以适应不同的HttpRequest子类的定义

Request接口定义如下:

public interface Request {

    @POST("{url}")
    Observable<JSONObject> postJSONResult(@Path(value="url",encoded =true) String url, @FieldMap Map<String, String> params);
}

我们定义的Observable泛型是通用的JSONObject,url是通过入参方式传入。

第二步 创建retrofit;

NetHelper.java中

// 初始化okhttp
OkHttpClient client = new OkHttpClient.Builder()
        .build();
/**
 *OkHttpClient每次请求的时候都要创建,注意:OkHttpClient.Builder()中有ConnectionPool作为OkHttp的   	*连接池要复用,否则请求过多时容易导致内存溢出
**/

// 初始化Retrofit
retrofit = new Retrofit.Builder()
        .client(client)
        .baseUrl(Request.HOST)
        .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
        .addConverterFactory(MyConverterFactory.create())
        .build();

一般addConverterFactory配置是GsonConverterFactory,用于把Response通过GSon转为javaBean(Request接口中定义的类型),由于我们定义为了JsonObject,所以我们使用MyConverterFactory自定义实现

第三步 生成Observable

Observable的真正创建,我们放到Request基类中

public abstract class HttpRequest<R extends HttpResponse>{
    protected abstract String getURLAction();
    
    //Observable的创建
    public Observable getObservable(Retrofit retrofit) {
        Request request = retrofit.create(Request.class);
		return request.postJSONResult(getURLAction(),getURLParam());
    }
    
    //子类中复写该接口的url
    public String getBaseURL(){
        //return base url;
    }
    
    public SortedMap<String, String> getURLParam() {
    	//return 对HttpRequest子类(业务类),定义的params,进行组合和加密
    	//通用的组合和加密逻辑就在此处。
    }
    
    //response类型解析
    protected Type type;

    public ObservableMapiRequest(BaseContext context) {
        super(context);
        initType();
    }

    //泛型R类型的获取,拿到HttpResponse的类型
    private void initType() {
        Type superClass = getClass().getGenericSuperclass();

        this.type = ((ParameterizedType) superClass).getActualTypeArguments()[0];

    }

    public Type getType() {
        return type;
    }
}

这一步中,我们将HttpRequest子类(例如LoginRequest)中定义的url params HttpResponse都获取到了。

第四步 NetHelper Api的调用

以上三步,已经初步将Observable返回。下面的处理NetHelper中Api的调用,也是框架的重点和核心:

private static <R extends HttpResponse<?>> Observable sendRequest(final ObservableRequest<R> orgRequest){
        return NetHelper.getObservable(orgRequest)
                //对Response的JsonObject进行类型转化
                .map(new JavaBeanFunc(orgRequest.getType()))
                //添加拦截器、Log等
                .filter(new LogInterceptor(orgRequest))
                //对Response预处理,code分类等
                .compose(ResponseTransformer.handleResult())
                //配置线程
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread());

    }

NetHelper.getApiObservable方法后,再加上网络请求的线程配置,这时候业务subscribe消费者,就可以直接得到解密后的JsonObject了。注意此时只是JSONObject,我们需要转换成我们需要的Response。

我们在HttpRequest类中看到了,定义HttpRequest子类时,是需要传入Response类型的(就是R),在HttpRequest类中已经将Response的类型解析出来,并保存了。在JavaBeanFunc中进行了转化

public class JavaBeanFunc<J extends JSONObject,T extends HttpResponse> implements Function<J,T> {
    Type mCon;
    public JavaBeanFunc(Type con){
        mCon = con;
    }
    @Override
    public T apply(J jsonObject) throws Exception {
        if (jsonObject != null){
            T response = jsonObject.toJavaObject(mCon);
            return response;
        } else {
            return null;
        }
    }
}

同时配置了拦截器,用于log输出,和异常的拦截,也可以做网络性能指标的记录等

public class LogInterceptor<R extends HttpResponse<?>> implements Predicate<R> {
    private HttpRequest<R> orgRequest;

    public LogInterceptor(HttpRequest<R> request){
        orgRequest = request;
    }

    @Override
    public boolean test(R response) throws Exception {
        boolean isPredicate = false;
        if (orgRequest != null && response != null) {
            HttpHelper.printHttpLog(orgRequest, JSONObject.toJSONString(response));
            isPredicate = true;
        }
        return isPredicate;
    }
}

response解析完毕之后,还没有完事;必须对response的正常和异常两种情况做一个判断,哪些情况作为onNext传递,哪些情况作为onError传递,也要定义清楚。此处我们采用ObservableTransformer来对数据流处理。

有些文章对于progressbar的控制也放到这里,我认为并不符合框架的设计原则,也就无法实现目标T3,不属于生产者该做的事情。

public static <T extends Serializable> ObservableTransformer<T, T> handleResult() {
    return upstream -> upstream
            .onErrorResumeNext(new ErrorResumeFunction<T>())
            .flatMap(new ResponseFunction<T>());
}

private static class ErrorResumeFunction<T extends Serializable> implements Function<Throwable, ObservableSource<? extends T>> {

    @Override
    public ObservableSource<? extends T> apply(Throwable throwable) throws Exception {
        return Observable.error(CustomException.handleException(throwable));
    }
}

private static class ResponseFunction<T extends Serializable> implements Function<T, ObservableSource<T>> {

    @Override
    public ObservableSource<T> apply(T tResponse) throws Exception {
        int code = tResponse.getCode();
        String message = tResponse.getMsg();

        if (code == SUCCESS.value()) {
            return Observable.just(tResponse);
        } else {
            return Observable.error(new ApiException(code, message));
        }
    }
}

至此,你可能有两个疑问,一个是response中code的判定可以在observer中处理吗,另一个是服务器错误和业务错误为何都作为error抛出。

第一个问题: code值的判定不可以在observer中处理,而必须在Observable一端处理。因为Observable形式的网络请求是作为数据流中的一环出现的,可能当前网络请求只是一连串异步调用(rxjava调用)的一环。这是实现目标T3的关键点。 第二个问题: response中code!=SUCCESS是业务错误的情况,必须向数据流中发出,让业务处理此异常。(那同时对于Response的定义也是,code!=SUCCESS必须是不需要业务处理的情况才行) 两种错误都抛出error(内部code不同),方便架构使用者在事件响应时,既能捕捉所有错误,又能区分错误的类型。

框架的封装性和Request cancle机制

框架的封装性

看下最终使用的情况

LoginRequest request = new LoginRequest();
request.setUserAccount("888");
request.setUserPwd("888");

ApiHelper.send(request)
    .subscribe(new Consumer<LoginResponse>() {
                    @Override
                    public void accept(LoginResponse s) throws Exception {

                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {

                    }
                });

业务在使用上很简洁,LoginRequest和LoginResponse的定义也是业务关注的内容。基本达到T4的目标

Request的Cancle机制

Request的cancle机制,一般原理是这样:在Activity/Fragment关闭时,需要cancle调正在请求中的网络,所以需要BaseActivity/BaseFragment的支持。 方案有二,这篇文章就不赘述了,因为都是比较成熟的方案。

  1. 框架中加入RxLifecycle的绑定,这样在调用NetHelper的Api时,要传入BaseActivity/BaseFragment的实例
  2. 自己实现:在框架返回Observable的doOnSubscribe方法中,对当前request的disposable进行保存,并设置tag保存disposable和activity的对应关系,在BaseActivity的onDestroy方法中统一对disposable进行清理。

框架基于Rxjava的链式调用

我们常常遇到这样一种场景,一个网络请求往往基于上一个网络请求的结果,或者一个页面的数据来自两个网络请求的组合,我们的网络框架是如何满足这种需求的呢?这也是我们这套框架设计的出发点和落脚点:

RegisterRequest registerRequest = new RegisterRequest();
registerRequest.setUserAccount(account.getText());
registerRequest.setUserPwd(pwd.getText());
registerRequest.setUserSex(sex.getType());

ApiHelper
    .send(registerRequest)
    .flatMap(new Function<RegisterResponse, ObservableSource<HttpResponse<LoginResponse>>>() {
            @Override
            public ObservableSource<HttpResponse<LoginResponse>> apply(RegisterResponse response) throws Exception {
                LoginRequest loginRequest = new LoginRequest();
                loginRequest.setUserAccount(account.getText());
                loginRequest.setUserPwd(pwd.getText();
                return ApiHelper.send(loginRequest);
                
            }
    })
    .subscribe(new Consumer<LoginResponse>() {
                    @Override
                    public void accept(LoginResponse s) throws Exception {

                    }
            }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {

                    }
            });

这样我们就完成了一个先注册后登陆的网络请求流程,将网络框架融入了rxjava的调用链之中。

总结

这篇文章着重对基于Observable的网络框架中,Observable生产者部分进行了阐述,在‘框架的封装性’一节所举的例子中,我们只采用了最普通的Consumer来进行事件的消费,我们在下一篇文章中RxJava练武场之——Observable网络框架的解耦和复用会专门对消费者进行封装,使该框架的规范性和扩展性更强。