目录
背景
之所以想要自己动手写一个简单的Rxjava,并不是想证明自己多厉害,而是借助动手来理解Rxjava的思想和源码,实话实说,Rxjava是我看过的源码里面名字取得最迷糊的。看了三四天,硬是把自己搞迷糊了。
相反,我尝试自己做一个简单的轮子,发现其实Rxjava的整体思想和实现并不难,但层层的封装真是让人抓不住头脑。不过,只要抓住规律,其实理解起来还是挺简单的。
重点在于分清楚:上游发射事件,下游接收事件
哈哈,我并不是在说废话,只要分清楚哪些操作符是作用在上游,哪些作用在下游,在此基础上对上游或者下游封装多一层就成了Rxjava,这样说也也是白说,让我们动手造个简单的。
先附上github源码
CustomObservable
为了对比rxjava,我把所有的类名前面都加了Custom
表示自定义的意思
定义两个下游的接收事件的基类(观察者):
CustomEmitter
和CustomObserver
,其实二者是基本一样的接口
负责接收onStart
、onNext
、onComplete
、onError
事件
其中CustomObserver是给暴露给外界使用的,而是CustomEmitter封装在内部使用
CustomEmitter
// 下游接收事件,并负责暴露给外部的发射事件者
public interface CustomEmitter<T> {
void onNext(T value);
void onError(Throwable e);
void onComplete();
}
CustomObserver
// 下游接收事件
public interface CustomObserver<T> {
void onStart();
void onNext(T t);
void onError(Throwable e);
void onComplete();
}
定义一个上游的执行回调事件的基类(被观察者):
CustomObservableSource
和CustomObservableOnSubscribe
负责执行subscribe
(耗时)方法,并发射(调用)onStart
、onNext
、onComplete
、onError
事件
其中CustomObservableOnSubscribe是给暴露给外界使用的,而CustomObservableSource是封装在内部使用
个人不喜欢观察者和被观察者的说法,总是分不清;
所以以下用上游执行者和下游接收者
CustomObservableOnSubscribe
public interface CustomObservableOnSubscribe<T> {
void subscribe(CustomEmitter<T> emitter);
}
CustomObservableSource
public interface CustomObservableSource<T> {
void subscribe(CustomObserver<? super T> observer);
}
CustomObservable
CustomObservable 就是一个基类,负责创建各个对应的子类,这里的create()创建了CustomObservableCreate
public abstract class CustomObservable<T> implements CustomObservableSource {
public static <T> CustomObservable<T> create(CustomObservableOnSubscribe<T> source) {
return new CustomObservableCreate(source);
}
@Override
public void subscribe(CustomObserver observer) {
subscribeActual(observer);
}
protected abstract void subscribeActual(CustomObserver observer);
}
这里之所以CustomObservableCreate继承CustomObservable而不是CustomObservableSource,是为了保证对外抛出去的是CustomObservable
-
CustomObservableCreate就是上游的执行事件者,负责封装一层CustomObservableOnSubscribe
-
CustomCreateEmitter就是下游的接收事件者,负责封装一层CustomObserver
// 上游封装了subscriber
public class CustomObservableCreate<T> extends CustomObservable {
private CustomObservableOnSubscribe<T> subscriber;
public CustomObservableCreate(CustomObservableOnSubscribe<T> subscriber) {
this.subscriber = subscriber;
}
@Override
protected void subscribeActual(CustomObserver observer) {
CustomCreateEmitter emitter = new CustomCreateEmitter<T>(observer);
observer.onStart();
// 真正执行耗时方法
subscriber.subscribe(emitter);
}
// 下游封装了CustomObserver
private static class CustomCreateEmitter<T> implements CustomEmitter<T> {
private CustomObserver<? super T> observer;
CustomCreateEmitter(CustomObserver<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T o) {
observer.onNext(o);
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
}
}
测试一下
public void testCreate() {
CustomObservable.create(new CustomObservableOnSubscribe<String>() {
@Override
public void subscribe(CustomEmitter<String> emitter) {
emitter.onNext("test create");
emitter.onComplete();
}
}).subscribe(ExampleUnitTest.<String>getObserver());
}
public static <T> CustomObserver getObserver() {
CustomObserver<T> observer = new CustomObserver<T>() {
@Override
public void onStart() {
System.out.println("==== start " + Thread.currentThread() + " ====");
}
@Override
public void onNext(T t) {
System.out.println(Thread.currentThread() + " next: " + t);
}
@Override
public void onError(Throwable e) {
System.out.println(Thread.currentThread() + " error: " + e);
}
@Override
public void onComplete() {
System.out.println("==== " + Thread.currentThread() + " complete ==== /n");
}
};
return observer;
}
测试结果:
==== start Thread[main,5,main] ====
Thread[main,5,main] next: test create
==== Thread[main,5,main] complete ====
操作符 map
rxjava的强大有一方面就在于它丰富的操作符,其中常用之一的就是map
map的作用是在下游的接收事件者(观察者),将返回的结果进行转换映射
定义一个CustomFunction负责数据转换
public interface CustomFunction<T, R> {
R apply(T t);
}
CustomObservable定义多一个map
的静态方法
public <R> CustomObservable<R> map(CustomFunction<T, R> function) {
return new CustomObservableMap(this, function);
}
CustomObservableMap
public class CustomObservableMap<R, T> extends CustomObservable {
private CustomObservableSource<T> source;
private CustomFunction<T, R> mapper;
public CustomObservableMap(CustomObservableSource<T> source, CustomFunction<T, R> mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
protected void subscribeActual(CustomObserver observer) {
CustomMapObserver<T, R> mapObserver = new CustomMapObserver(observer, mapper);
source.subscribe(mapObserver);
}
private static class CustomMapObserver<T, R> implements CustomObserver<T> {
private CustomObserver<R> observer;
private CustomFunction<T, R> function;
public CustomMapObserver(CustomObserver<R> observer, CustomFunction<T, R> function) {
this.observer = observer;
this.function = function;
}
@Override
public void onStart() {
observer.onStart();
}
@Override
public void onNext(T result) {
// 做结果数据转换映射
observer.onNext(function.apply(result));
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
}
}
测试一下
public void testMap() {
CustomObservable.create(new CustomObservableOnSubscribe<String>() {
@Override
public void subscribe(CustomEmitter<String> emitter) {
emitter.onNext("test create");
emitter.onComplete();
}
}).map(new CustomFunction<String, String>() {
@Override
public String apply(String s) {
return "test map " + s;
}
}).subscribe(ExampleUnitTest.<String>getObserver());
}
测试结果
==== start Thread[main,5,main] ====
Thread[main,5,main] next: test map test create
==== Thread[main,5,main] complete ====
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/6253.html