Android 写一个属于自己的Rxjava(一)详解手机开发

目录

Android 写一个属于自己的Rxjava(一)

Android 写一个属于自己的Rxjava(二)

背景

之所以想要自己动手写一个简单的Rxjava,并不是想证明自己多厉害,而是借助动手来理解Rxjava的思想和源码,实话实说,Rxjava是我看过的源码里面名字取得最迷糊的。看了三四天,硬是把自己搞迷糊了。

相反,我尝试自己做一个简单的轮子,发现其实Rxjava的整体思想和实现并不难,但层层的封装真是让人抓不住头脑。不过,只要抓住规律,其实理解起来还是挺简单的。

重点在于分清楚:上游发射事件,下游接收事件

哈哈,我并不是在说废话,只要分清楚哪些操作符是作用在上游,哪些作用在下游,在此基础上对上游或者下游封装多一层就成了Rxjava,这样说也也是白说,让我们动手造个简单的。

先附上github源码

CustomObservable

为了对比rxjava,我把所有的类名前面都加了Custom表示自定义的意思

定义两个下游的接收事件的基类(观察者):

CustomEmitterCustomObserver ,其实二者是基本一样的接口

负责接收onStartonNextonCompleteonError事件

其中CustomObserver是给暴露给外界使用的,而是CustomEmitter封装在内部使用

CustomEmitter

// 下游接收事件,并负责暴露给外部的发射事件者 
public interface CustomEmitter<T> {
    
    void onNext(T value); 
 
    void onError(Throwable e); 
 
    void onComplete(); 
} 
CustomObserver
// 下游接收事件 
public interface CustomObserver<T> {
    
    void onStart(); 
 
    void onNext(T t); 
 
    void onError(Throwable e); 
 
    void onComplete(); 
} 

定义一个上游的执行回调事件的基类(被观察者):

CustomObservableSourceCustomObservableOnSubscribe

负责执行subscribe(耗时)方法,并发射(调用)onStartonNextonCompleteonError事件

其中CustomObservableOnSubscribe是给暴露给外界使用的,而CustomObservableSource是封装在内部使用

个人不喜欢观察者和被观察者的说法,总是分不清;

所以以下用上游执行者下游接收者

CustomObservableOnSubscribe
public interface CustomObservableOnSubscribe<T> {
    
    void subscribe(CustomEmitter<T> emitter); 
} 
CustomObservableSource
public interface CustomObservableSource<T> {
    
    void subscribe(CustomObserver<? super T> observer); 
} 
CustomObservable

CustomObservable 就是一个基类,负责创建各个对应的子类,这里的create()创建了CustomObservableCreate

public abstract class CustomObservable<T> implements CustomObservableSource {
    
 
    public static <T> CustomObservable<T> create(CustomObservableOnSubscribe<T> source) {
    
        return new CustomObservableCreate(source); 
    } 
     
   @Override 
    public void subscribe(CustomObserver observer) {
    
        subscribeActual(observer); 
    } 
 
    protected abstract void subscribeActual(CustomObserver observer); 
} 

这里之所以CustomObservableCreate继承CustomObservable而不是CustomObservableSource,是为了保证对外抛出去的是CustomObservable

  • CustomObservableCreate就是上游的执行事件者,负责封装一层CustomObservableOnSubscribe

  • CustomCreateEmitter就是下游的接收事件者,负责封装一层CustomObserver

// 上游封装了subscriber 
public class CustomObservableCreate<T> extends CustomObservable {
    
    private CustomObservableOnSubscribe<T> subscriber; 
 
    public CustomObservableCreate(CustomObservableOnSubscribe<T> subscriber) {
    
        this.subscriber = subscriber; 
    } 
 
    @Override 
    protected void subscribeActual(CustomObserver observer) {
    
        CustomCreateEmitter emitter = new CustomCreateEmitter<T>(observer); 
        observer.onStart(); 
        // 真正执行耗时方法 
        subscriber.subscribe(emitter); 
    } 
 
    // 下游封装了CustomObserver 
    private static class CustomCreateEmitter<T> implements CustomEmitter<T> {
    
        private CustomObserver<? super T> observer; 
 
        CustomCreateEmitter(CustomObserver<? super T> observer) {
    
            this.observer = observer; 
        } 
 
        @Override 
        public void onNext(T o) {
    
            observer.onNext(o); 
        } 
 
        @Override 
        public void onError(Throwable e) {
    
            observer.onError(e); 
        } 
 
        @Override 
        public void onComplete() {
    
            observer.onComplete(); 
        } 
    } 
} 
测试一下
public void testCreate() {
    
   CustomObservable.create(new CustomObservableOnSubscribe<String>() {
    
       @Override 
       public void subscribe(CustomEmitter<String> emitter) {
    
           emitter.onNext("test create"); 
           emitter.onComplete(); 
       } 
   }).subscribe(ExampleUnitTest.<String>getObserver()); 
} 
 
public static <T> CustomObserver getObserver() {
    
    CustomObserver<T> observer = new CustomObserver<T>() {
    
        @Override 
        public void onStart() {
    
            System.out.println("==== start " + Thread.currentThread() + " ===="); 
        } 
 
        @Override 
        public void onNext(T t) {
    
            System.out.println(Thread.currentThread() + " next: " + t); 
        } 
 
        @Override 
        public void onError(Throwable e) {
    
            System.out.println(Thread.currentThread() + " error: " + e); 
        } 
 
        @Override 
        public void onComplete() {
    
            System.out.println("==== " + Thread.currentThread() + " complete ==== /n"); 
        } 
    }; 
    return observer; 
} 
测试结果:

==== start Thread[main,5,main] ====
Thread[main,5,main] next: test create
==== Thread[main,5,main] complete ====

操作符 map

rxjava的强大有一方面就在于它丰富的操作符,其中常用之一的就是map

map的作用是在下游的接收事件者(观察者),将返回的结果进行转换映射

定义一个CustomFunction负责数据转换

public interface CustomFunction<T, R> {
    
    R apply(T t); 
} 

CustomObservable定义多一个map的静态方法

public <R> CustomObservable<R> map(CustomFunction<T, R> function) {
    
     return new CustomObservableMap(this, function); 
} 

CustomObservableMap

public class CustomObservableMap<R, T> extends CustomObservable {
    
    private CustomObservableSource<T> source; 
    private CustomFunction<T, R> mapper; 
 
    public CustomObservableMap(CustomObservableSource<T> source, CustomFunction<T, R> mapper) {
    
        this.source = source; 
        this.mapper = mapper; 
    } 
 
    @Override 
    protected void subscribeActual(CustomObserver observer) {
    
        CustomMapObserver<T, R> mapObserver = new CustomMapObserver(observer, mapper); 
        source.subscribe(mapObserver); 
    } 
 
    private static class CustomMapObserver<T, R> implements CustomObserver<T> {
    
        private CustomObserver<R> observer; 
        private CustomFunction<T, R> function; 
 
        public CustomMapObserver(CustomObserver<R> observer, CustomFunction<T, R> function) {
    
            this.observer = observer; 
            this.function = function; 
        } 
 
        @Override 
        public void onStart() {
    
            observer.onStart(); 
        } 
 
        @Override 
        public void onNext(T result) {
    
            // 做结果数据转换映射 
            observer.onNext(function.apply(result)); 
        } 
 
        @Override 
        public void onError(Throwable e) {
    
            observer.onError(e); 
        } 
 
        @Override 
        public void onComplete() {
    
            observer.onComplete(); 
        } 
    } 
} 

测试一下

public void testMap() {
    
    CustomObservable.create(new CustomObservableOnSubscribe<String>() {
    
        @Override 
        public void subscribe(CustomEmitter<String> emitter) {
    
            emitter.onNext("test create"); 
            emitter.onComplete(); 
        } 
    }).map(new CustomFunction<String, String>() {
    
        @Override 
        public String apply(String s) {
    
            return "test map " + s; 
        } 
    }).subscribe(ExampleUnitTest.<String>getObserver()); 
} 

测试结果

==== start Thread[main,5,main] ====
Thread[main,5,main] next: test map test create
==== Thread[main,5,main] complete ====

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/app/6253.html

(0)
上一篇 2021年7月17日 00:44
下一篇 2021年7月17日 00:44

相关推荐

发表回复

登录后才能评论