RxJava 之教你写响应式框架 (三)

2,179 阅读8分钟

教你写响应式框架(二)中,我们对原始代码进行了初步的改造,如果没看过上篇的可以先看一下.那么在今天我们仍然是在原有项目的基础上进行改造,在改造之前,我们想先提出两个目标:

  • 增加map操作符的支持,实现对数据的转换
  • 支持链式调用

首先,我们仍然是贴一下上面的代码,以便你能够更容易理解我的思路:


public class Observable {
    protected OnAttach onAttach;

    public Observable(OnAttach onAttach) {
        this.onAttach = onAttach;
    }

    public static  Observable create(OnAttach onAttach) {
        return new Observable(onAttach);
    }


    public void attach(Observer observer) {
        onAttach.call(observer);
    }

    public interface OnAttach {
        void call(Observer observer);
    }

}


public interface Observer {
    void update(T state);
}


public class Client {
    public static void main(String[] args) {

        
        Observable.OnAttach onAttach = new Observable.OnAttach() {
            @Override
            public void call(Observer observer) {
                ArrayList list = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5));
                observer.update(list);

            }
        };

        
        Observable observable = Observable.create(onAttach);

        
        Observer observer = new Observer>() {

            @Override
            public void update(ArrayList state) {
                state.stream().forEach(p -> {
                    System.out.println(p);
                });
            }
        };

        
        observable.attach(observer);

    }
}

接下来我们需要对我们的两个目标做一下描述:

map操作符

首先我们来定义一下map的操作

map操作即映射操作,可以对数据进行变换.比如说”hello”变换为”hello world”或者是”1”字符串转换成Integer类型等.现在我们暂且不谈怎么实现.

方法链

接下来我们来考虑链式调用,所谓的链式调用就是我们常说的方法链.如果你对JQuery熟悉的化,那最好不过了,可以直接略过此处的说明了.
那么什么是方法链呢?
举例说明,我们存在这样的代码:

Class cla=new Class();
cla.setWidth();
cla.setHeight();
cla.setWeight();

这样的代码看起来很繁杂,能不能简化成这样呢?

Class cla=new Class();
cla.setWidth().setHeight().setWeight();

类似简化后的代码就是方法链,即方法的连续调用.在java中我们怎么实现呢?如果你看过Effective Java,那肯定会有解决方案.实现方法链,通常有两种手段:

  1. 每个方法返回对象自身
  2. 每个方法返回一个新的对象(和原方法所属对象同类型的对象)

第一种方法常用在构造器需要多个参数的情况下,而其中有些参数却有不是必须的.如:

public class Person {
    private String name;
    private int age;

    private float weight;
    private float height;

    public Person(String name, int age) {
        this.age = age;
        this.name = name;
    }

    public Person(String name, int age, float weight) {
        this.name = name;
        this.age = age;
        this.weight = weight;
    }

    public Person(String name, int age, float weight, float height) {
        this.name = name;
        this.age = age;
        this.weight = weight;
        this.height = height;
    }

    public void setWeight(float weight) {
        this.weight = weight;
    }

    public void setName(String name) {
        this.name = name;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public void setHeight(float height) {
        this.height = height;
    }
}

这时候在使用Person类的时候可能就是如下代码了:

Person p=new Person("sbbic",18)
p.setWight(50)
p.setHeight(172)
....

如果属性非常多,那还用这样的方式就显得繁琐,这明显和我们追求”高效简洁”这一目标相违背嘛,怎么能忍?Ok,现在我们来对这段代码进行改造:

public class Person {
    private String name;
    private int age;

    private float weight;
    private float height;

    public Person(int age, String name) {
        this.age = age;
        this.name = name;
    }

    public Person(String name, int age, float weight) {
        this.name = name;
        this.age = age;
        this.weight = weight;
    }

    public Person(String name, int age, float weight, float height) {
        this.name = name;
        this.age = age;
        this.weight = weight;
        this.height = height;
    }

    public Person setWeight(float weight) {
        this.weight = weight;
        return this;
    }

    public Person setName(String name) {
        this.name = name;
        return this;
    }

    public Person setAge(int age) {
        this.age = age;
        return this;
    }

    public Person setHeight(float height) {
        this.height = height;
        return this;return this;
    }
}

现在我们就可以想使用JQuery一样来进行操作:

Person p=new Person("sbbic",20)
p.setWeight(50).setHeight(172).setAge(24)

上面我们简单的说明了如何用第一种方式来实现方法链,而第二种和第一种的实现很类似,只不过在每个方法后返回的不是this而是一个新的Person对象.

到现在为止,开胃小菜已经吃了一半了,下面我们继续!

这次,我们先来从整体的角度来考虑:以后的操作符不只会有map,可能还有filter之类的操作,因此在设计上需要尽可能的抽象;另外,对于所有的操作符最好是支持链式操作.

现在我们从代码开始解剖实现的过程:

观察者接口,没做任何变化

public interface Observer {
    void update(T t);
}

操作符接口(Operator)父接口.定义这个接口的目的是为了解决java中”函数不是一等公民”这问题:函数不能像普通的数据类型一样充当参数.

public interface IFun {
    R call(T t);
}

被观察者,在这里需要重点关注map()方法和lift()方法.

public class Observable {
    protected OnAttach onAttach;

    public Observable(OnAttach onAttach) {
        this.onAttach = onAttach;
    }

    public static  Observable create(OnAttach onAttach) {
        return new Observable(onAttach);
    }


    public void attach(Observer observer) {
        onAttach.call(observer);
    }

    public  Observable map(IFun fun) {
        OperatorMap operatorMap = new OperatorMap(fun);
        
        Observable observable = lift(operatorMap);
        return observable;

    }

    public interface OnAttach {
        void call(Observer observer);
    }

    
    public  Observable lift(Operator operator) {
        return new Observable<>(new OnAttach() {
            @Override
            public void call(Observer observer) {
                Observer call = operator.call(observer);
                Observable.this.onAttach.call(call);

            }
        });
    }

    
    public interface Operator extends IFun, Observer> {}

}

map操作符的具体实现,在这里重点关注一下泛型位置的颠倒,并自己想想为什么要这么设计

public class OperatorMap implements Observable.Operator {
    private IFun convert;

    public OperatorMap(IFun convert) {
        this.convert = convert;
    }

    @Override
    public Observer call(Observer observer) {
        return new Observer() {
            @Override
            public void update(T t) {
                observer.update(convert.call(t));

            }
        };
    }
}

客户端测试代码,这里我尽可能”繁琐”,以便能帮助你快速的理解:

public class Client {
    public static void main(String[] args) {

        
        Observable.OnAttach onAttach0 = new Observable.OnAttach() {
            @Override
            public void call(Observer observer) {

                observer.update("test");

            }
        };

        
        Observable observable0 = Observable.create(onAttach0);

        Observable observable1 = observable0.map(new IFun() {

            @Override
            public String call(String s) {
                return s + "_map";
            }
        });

        Observable observable2 = observable1.map(new IFun() {

            @Override
            public String call(String s) {
                return s + "_???";
            }
        });

        
        Observer observer0 = new Observer() {
            @Override
            public void update(String t) {
                System.out.println(t);
            }
        };

        
        observable2.attach(observer0);

    }
}

设计分析

先来分析客户端测试代码的执行过程,进而考虑为什么这么设计.

首先我们创建observable0,该对象持有成员变量onAttach0.
随后在observable0的基础上,我们执行map操作,我们看看这个map操作做了什么事情:

1.将map中定义的操作封装成OperatorMap对象
2.调用lift方法生成新的observable以便支持链式调用

lift方法看起来很简单,就是生成新的observable嘛。对,确实看你起来简单。在这里新生成了observable1对象,假设该对象持有成员变量为onAttach1.

在observable1对象上继续执行map操作,生成了observable2对象,假设该对象持有的成员变量为onAttach2.

我们继续往下,可以看到我们创建了一个观察者对象observer0,随后我们调用了
observable2.attach(observer0);到这一步,整个程序才算是正式开始执行了。我们来分析一下这个过程
observable2中,onAttach2的call方法开始执行,其接受的观察者是observer0对象,也就是执行以下代码:

 Observer call = operator.call(observer)
 Observable.this.onAttach.call(call)

这段代码要做什么呢?

Observer call = operator.call(observer);

该代码执行的map操作符的call方法,此处的Observer实则是刚才我们创建的observer0.在OperatorMap操作中,我们看call方法,这个方法的目的就是返回一个新的Observer,我们假设它的名字是observer1,此处的observer1中实则持有observer0的引用,如下:

@Override
    public Observer call(Observer observer) {
        return new Observer() {
            @Override
            public void update(T t) {
                observer.update(convert.call(t));

            }
        };
    }

我们继续往下走,上面我们说到新生成了observer1对象.该对象有什么用呢?我们看这行代码:

Observable.this.onAttach.call(call)

这里涉及到了很基础的一点:Observabe.this是哪个对象?observable0还是observable1还是observable2呢??这也就引出了一个最基本的问题:Observable.this在匿名类中指的是外部类对象还是内部类对象呢?这个问题,如果你还有疑惑的化,说明你需要回去巩固一下基础知识了.

observable2对像是在observable1对象的lift()方法中通过匿名的方式创建的,因此此处的Observable.this指的是observable1.因此上面的代码实际上执行了observable1.onAttach.call(observer1).

到现在你发现我们当前操作的对象变成observable1了,反应过来了嘛?执行过程和observable2.onAttach.call(observer0)类似,我们直接看执行结果是什么:
首先生成了新的Observer对象observer2,observer2实则拥有observer1的引用;然后是observable0.onAttach.call(observer2)的执行.

以attach()方法为界限,在这之前对象的时间顺序是:

生成顺序:
observable0–>observable1–>obserbable2

在这之后是

访问顺序:
observable2–>observable1–>observable

于此同时,也生成了与之对应的Observer对象.

生成顺序
observer1–>observer2

到现在为止,我们实际上已经形成了一条关于observable对象的对象链。如果还记得链表的话,想必是不会陌生的:
这里写图片描述
在这里生成的对象链实际上是这样:
这里写图片描述

现在我们已经回到了最开始的observable0对象上,我们继续往下看:
observable0.onAttach.call(observer2)
这里observable0对象的onAttach对象是我们开始时定义的onAttach0,它的call方法做了什么事情呢:

@Override
public void call(Observer observer) {
      observer.update("test");
   }

可以看出实际上就是通知观察者,这个观察者是谁呢?很显然就是我们上文通过匿名方式生成的observer2.换言之,observer2是observable0的观察者.
我们看observer2的方法update()做了什么:

  @Override
    public Observer call(Observer observer) {
        return new Observer() {
            @Override
            public void update(T t) {
                observer.update(convert.call(t));

            }
        };
    }

convert.call(t)方法容易理解,就是第一次定义的map操作嘛,那这里observer对象是谁?(忘了的童鞋自行看lift()方法分析的过程)此observer就是observer1.因此此处其实是执行了:observer1.update("test"+"_map");
对于observer1的update方法来说,因为同样是map操作,因此实际上执行的是:
observer0.update("test_map"+"_???");
现在我们继续看observer0的update()方法,observer0是我们显示定义的:

 Observer observer0 = new Observer() {
            @Override
            public void update(String t) {
                System.out.println(t);
            }
        };

也就是执行了
System.out.pringln(test_map_???);
到现在为止,整个执行过程我们已经分析清楚了,我们用一张图来描述attach()方法之后的执行过程:
这里写图片描述
实际上你会发现observer2是observable0的观察者,observer1是observable1的观察者,observer0是observable2的观察者.当找到最初的observable0之后,数据才开始正式被observer2处理,处理完的数据交给observer1继续处理,最后是observer0.其实,每个observer即是观察者,也是被观察者.

ok,到现在为止,这个简单的响应式框架看起来好像有点味道了,不过客户端代码也太low了把,让它变得高大尚一点:

public class Client {
    public static void main(String[] args) {

        Observable.create(new Observable.OnAttach() {
            @Override
            public void call(Observer observer) {
                observer.update("test");
            }
        }).map(s -> s + "_map").map(s -> s + "_???").attach(new Observer() {
            @Override
            public void update(String t) {
                System.out.println(t);
            }
        });

    }
}

我们来运行一下:

test_map_???

看起来还不错的样子,可惜不能吃啊.

完整项目见github.com/closedevice…