阅读 161

RxJava系列(一):RxJava 观察者模式

什么是RxJava

RxJava是ReactiveX在JVM上的一个实现,使用可观察序列来编写异步和基于事件的程序的库。它扩展了观察者模式以支持数据/事件序列,并添加了允许您以声明方式组合序列的运算符,同时抽象出对低级线程,同步,线程安全和并发数据结构等问题的关注。

什么是观察者模式

观察者模式也被称为发布-订阅(Publish/Subscribe)模式,它属于行为型模式的一种。观察者模式定义了一种一对多的依赖关系,一个主题对象可被多个观察者对象同时监听。当这个主题对象状态变化时,会通知所有观察者对象并作出相应处理逻辑。

观察者模式角色

  • 抽象被观察者(Subject):即抽象主题,它把所有对观察者对象的引用保存在一个集合中,可以有任意数量的观察者,抽象主题提供一个接口,可以增加、删除、通知观察者对象。
  • 抽象观察者(Observer):抽象观察者,是观察者者的抽象类,它定义了一个更新接口,使得在得到主题更改通知时更新自己。
  • 具体被观察者(Concrete Subject):将有关状态存入具体观察者对象,在具体主题的内部状态发生改变时,给所有注册过的观察者发送通知。
  • 具体观察者(Concrere Observer):实现抽象观察者定义的更新接口,以便在得到主题更改通知时更新自身的状态。

观察者模式实现(以微博粉丝关注明星为例子)

1.创建抽象被观察者(Subject):

public interface Star {

    /**
     * 添加粉丝
     */
    void addFan(Fan fan);
    
    /**
     * 取消粉丝
     */
    void removeFan(Fan fan);
    
    /**
     * 分享动态
     */
    void notifyFan(String message);
    
}
复制代码

2.创建抽象观察者(Observer)

public interface Fan {

    /**
     * 更新动态
     */
    void update(String message);
    
}
复制代码

3.创建具体被观察者(Concrete Subject 具体明星)

public class AStar implements Star{

    private List<Fan> fanList = null;
    
    public AStar(){
        fanList = new ArrayList<Fan>();
    }
    
    @Override
    public void addFan(Fan fan){
        fanList.add(fan);
    }
    
    @Override
    public void removeFan(Fan fan){
        fanList.remove(fan);
    }
    
    @Override
    public void notifyFan(String message){
        for(Fan fan : fanList){
            fan.update("AStar 发布了 ** 信息");
        }
    }
}
复制代码

4.创建具体观察者(Concrere Observer 具体粉丝)

public class AFan implements Fan{
    
    private String fanName;
    
    public AFan(String fanName){
        this.fanName = fanName;
    }
    
    @Override
    public void update(String message){
        Log.d("AFan 收到了 AStar 发布的消息");
    }
    
}
复制代码

RxJava观察者模式

RxJava三个基本元素

Observable(被观察者),Observer(观察者),subscribe(订阅)。

Rxjava中的抽象被观察者(抽象主题Subject)

Observable 是一个抽象类,实现了ObservableSource抽象接口。

public abstract class Observable<T> implements ObservableSource<T> {
......
}
复制代码

ObservableSource中subscribe()用来订阅观察者,所以ObservableSource相当于抽象被观察者。

public interface ObservableSource<T> {

    /**
     * Subscribes the given Observer to this ObservableSource instance.
     * @param observer the Observer, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(@NonNull Observer<? super T> observer);
}
复制代码

RxJava中的抽象观察者(Observer)

通过ObservableSource的subscribe()方法可知抽象观察者为里面的参数对象Observer。

public interface Observer<T> {

    /**
     * Provides the Observer with the means of cancelling (disposing) the
     * connection (channel) with the Observable in both
     * synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
     * @param d the Disposable instance whose {@link Disposable#dispose()} can
     * be called anytime to cancel the connection
     * @since 2.0
     */
    void onSubscribe(@NonNull Disposable d);

    /**
     * Provides the Observer with a new item to observe.
     * <p>
     * The {@link Observable} may call this method 0 or more times.
     * <p>
     * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
     * {@link #onError}.
     *
     * @param t
     *          the item emitted by the Observable
     */
    void onNext(@NonNull T t);

    /**
     * Notifies the Observer that the {@link Observable} has experienced an error condition.
     * <p>
     * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
     * {@link #onComplete}.
     *
     * @param e
     *          the exception encountered by the Observable
     */
    void onError(@NonNull Throwable e);

    /**
     * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
     * <p>
     * The {@link Observable} will not call this method if it calls {@link #onError}.
     */
    void onComplete();

}
复制代码

Rxjava中的具体被观察者(Concrete Subject)

/**
 * 创建Observable
 */
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("Hello");
    }
});

/**
 * Provides an API (via a cold Observable) that bridges the reactive world with the callback-style world.
 * @param <T> the element type
 * @param source the emitter that is called when an Observer subscribes to the returned {@code Observable}
 * @return the new Observable instance
 * @see ObservableOnSubscribe
 * @see ObservableEmitter
 * @see Cancellable
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
复制代码

通过create方法源码可知ObservableCreate为具体被观察者。

RxJava中的具体观察者(Concrere Observer)

Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
    }

    @Override
    public void onNext(String s) {
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onComplete() {
    }
};
复制代码

上述实现observer接口的observer为具体观察者。

Rxjava订阅实现

observable.subscribe(observer);
复制代码
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}
复制代码