目录
前言
上一篇实现了Rxjava基本的Observable和map操作符的实现,接下来需要实现Rxjava最重要的线程切换和复杂的操作符:
- subscribeOn()
- observeOn()
- from()
- zip()
- flatmap()
先附上github源码
subscribeOn()
subscribeOn()
作用在上游的发射,先定义一个CustomScheduler
,提供执行任务的接口。
public class CustomScheduler {
private final Executor executor;
public CustomScheduler(Executor executor) {
this.executor = executor;
}
public CustomWorker createWorker() {
return new CustomWorker(executor);
}
public static class CustomWorker{
private final Executor executor;
public CustomWorker(Executor executor) {
this.executor = executor;
}
public void schedule(Runnable runnable) {
executor.execute(runnable);
}
}
}
我们可以定义多种多样的CustomScheduler
,指定执行在什么线程或者线程池。我们还可以造一个执行在主线程的Scheduler,就可以达到AndroidSchedulers.mainThread()
一样的效果。
继续在CustomObservable中提供subscribeOn()的方法:
// CustomObservable
public CustomObservable<T> subscribeOn(CustomScheduler scheduler) {
return new CustomObservableSubscribeOn(this, scheduler);
}
跟上篇文章一样,生成了CustomObservableSubscribeOn来封装上游和下游。CustomObservableSubscribeOn的实现也很简单,只是将上游的执行扔进CustomScheduler
线程池里面执行,下游Observer不需要做什么动作。
class CustomObservableSubscribeOn<T> extends CustomObservable<T> {
private CustomObservableSource<T> source;
private CustomScheduler scheduler;
public CustomObservableSubscribeOn(CustomObservableSource<T> source, CustomScheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final CustomObserver observer) {
final CustomSubscribeOnObserver subscribeOnObserver = new CustomSubscribeOnObserver(observer);
CustomScheduler.CustomWorker worker = scheduler.createWorker();
worker.schedule(new Runnable() {
@Override
public void run() {
// 将任务执行扔进CustomScheduler
source.subscribe(subscribeOnObserver);
}
});
}
private static final class CustomSubscribeOnObserver<T> implements CustomObserver<T> {
final CustomObserver<? super T> actual;
CustomSubscribeOnObserver(CustomObserver<? super T> actual) {
this.actual = actual;
}
@Override
public void onStart() {
actual.onStart();
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable error) {
actual.onError(error);
}
@Override
public void onComplete() {
actual.onComplete();
}
}
}
ObserveOn()
其实本质跟subscribeOn是一样的,区别在于ObserveOn()作用在下游的observer中。
提供ObserverOn()方法
// CustomObservable
public CustomObservable<T> observeOn(CustomScheduler scheduler) {
return new CustomObservableObserveOn(this, scheduler);
}
继续新建CustomObservableObserveOn类,只需要将回调事件onNext等扔进CustomScheduler的线程池就完成任务了。
class CustomObservableObserveOn<T> extends CustomObservable<T> {
private CustomObservableSource<T> source;
private CustomScheduler scheduler;
public CustomObservableObserveOn(CustomObservableSource source, CustomScheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(CustomObserver observer) {
CustomScheduler.CustomWorker worker = scheduler.createWorker();
CustomObserverObserveOn observerObserveOn = new CustomObserverObserveOn<T>(observer, worker);
source.subscribe(observerObserveOn);
}
private static class CustomObserverObserveOn<T> implements CustomObserver<T> {
private CustomObserver<T> observer;
private CustomScheduler.CustomWorker worker;
public CustomObserverObserveOn(CustomObserver<T> observer, CustomScheduler.CustomWorker worker) {
this.observer = observer;
this.worker = worker;
}
@Override
public void onStart() {
this.worker.schedule(new Runnable() {
@Override
public void run() {
observer.onStart();
}
});
}
@Override
public void onNext(final T t) {
this.worker.schedule(new Runnable() {
@Override
public void run() {
observer.onNext(t);
}
});
}
@Override
public void onError(final Throwable e) {
this.worker.schedule(new Runnable() {
@Override
public void run() {
observer.onError(e);
}
});
}
@Override
public void onComplete() {
this.worker.schedule(new Runnable() {
@Override
public void run() {
observer.onComplete();
}
});
}
}
}
from()
rxjava用fromIterable 操作符可以逐次发射list的中的数据。
怎么简单实现一个封装多个值的Observable。其实也不难,就是执行subscribeOn()后,多次调用onNext()发射数据。
// CustomObservable
public static <T> CustomObservable<T> from(Iterable<T> values) {
return new CustomObservableIterable<>(values);
}
继续造CustomObservableIterable
class CustomObservableIterable<T> extends CustomObservable {
private Iterable<T> valueIter;
public CustomObservableIterable(Iterable<T> valueIter) {
this.valueIter = valueIter;
}
@Override
protected void subscribeActual(CustomObserver observer) {
CustomIterableObserver<T> iterableObserver = new CustomIterableObserver<>(valueIter, observer);
CustomInterableSource source = new CustomInterableSource();
source.subscribe(iterableObserver);
}
private class CustomInterableSource implements CustomObservableSource {
@Override
public void subscribe(CustomObserver observer) {
observer.onStart();
observer.onNext(null);
observer.onComplete();
}
}
private static class CustomIterableObserver<T> implements CustomObserver<T> {
private Iterable<T> valueIter;
private CustomObserver<T> observer;
CustomIterableObserver(Iterable<T> valueIter, CustomObserver<T> observer) {
this.valueIter = valueIter;
this.observer = observer;
}
@Override
public void onStart() {
this.observer.onStart();
}
@Override
public void onNext(T t) {
for (T value : valueIter) {
this.observer.onNext(value);
}
}
@Override
public void onError(Throwable e) {
this.observer.onError(e);
}
@Override
public void onComplete() {
this.observer.onComplete();
}
}
}
zip
网上把zip说得好复杂,每次我都没看明白,其实zip用起来很简单,就是将多个上游的发射请求执行结果混合在一起,统一发射给同一个下游observer。但是要注意的是,多个上游的是一一对应混合的。
任务A的执行的结果是[1, 2, 3]
任务B的执行的结果是[1, 2]
混合规则是加法,那么最后的结果是什么?结果是:[2, 4]
因为B没有结果跟A的3对应,所以抛弃了A的3。
zip的实现比较复杂,同样先提供一个对外的静态方法
// CustomObservable
public static <T, U, R> CustomObservable<R> zip(final CustomObservableSource<T> o1,
final CustomObservableSource<U> o2,
CustomBiFunction<T, U, R> mapper) {
List<CustomObservableSource<?>> list = Arrays.asList(o1, o2);
CustomFunction<Object[], R> arrayFunc = new CustomFunctions.Array2Func(mapper);
return new CustomObservableZip(list, arrayFunc);
}
public interface CustomBiFunction<T, U, R> {
R apply(T t, U u);
}
我们将CustomBitFunction转换成CustomFunction<Object[], R>,更有通配性,简单理解就是表示多个CustomObservableSource转换成R结果。至于如何转换,直接看上面的github源码。
public class CustomObservableZip<T, U, R> extends CustomObservable<T> {
List<CustomObservableSource<T>> sources;
CustomFunction<Object[], R> mapper;
public CustomObservableZip(List<CustomObservableSource<T>> sources, CustomFunction<Object[], R> mapper) {
this.sources = sources;
this.mapper = mapper;
}
@Override
protected void subscribeActual(CustomObserver observer) {
ZipCoordinator zipCoordinator = new ZipCoordinator(observer, sources, mapper);
zipCoordinator.subscribe();
}
static final class ZipCoordinator<T, R> {
CustomObserver<R> actual;
List<CustomObservableSource<T>> sources;
List<ZipObserver<T, R>> observers;
CustomFunction<Object[], R> mapper;
int size;
boolean isFinish;
ZipCoordinator(CustomObserver<R> observer,
List<CustomObservableSource<T>> sources,
CustomFunction<Object[], R> mapper) {
this.actual = observer;
this.sources = sources;
this.mapper = mapper;
this.size = sources.size();
this.observers = new ArrayList<>(size);
this.isFinish = false;
}
public void subscribe() {
actual.onStart();
for (int i = 0; i<size; i++) {
ZipObserver observer = new ZipObserver<T, R>(this);
observers.add(observer);
}
for (int i = 0; i<size; i++) {
sources.get(i).subscribe(observers.get(i));
}
}
void drain() {
if (isFinish) {
return;
}
boolean canMerge = true;
boolean isDone = true;
for (ZipObserver<T, R> observer: observers) {
if (!observer.isDone) {
isDone = false;
}
if (observer.queue.isEmpty()) {
canMerge = false;
}
}
if (canMerge) {
List<T> mergeList = new ArrayList<>(size);
for (ZipObserver<T, R> observer: observers) {
T t = observer.queue.poll();
mergeList.add(t);
}
actual.onNext(mapper.apply(mergeList.toArray()));
}
if (isDone) {
actual.onComplete();
}
}
}
static class ZipObserver<T, R> implements CustomObserver<T> {
boolean isDone;
ZipCoordinator<T, R> parent;
Queue<T> queue;
Throwable error;
public ZipObserver(ZipCoordinator parent) {
this.parent = parent;
this.queue = new LinkedList<>();
this.isDone = false;
}
@Override
public void onStart() {
}
@Override
public void onNext(T o) {
queue.add(o);
parent.drain();
}
@Override
public void onError(Throwable e) {
isDone = true;
error = e;
parent.drain();
}
@Override
public void onComplete() {
isDone = true;
parent.drain();
}
}
}
事实上Rxjava的zip实现比上面复杂多一些。
简单说下我的实现方式,就是为每一个CustomObservableSource提供一个ZipObserver,内部存储着自己的计算结果,每次执行完任务调用onNext的时候,就去看下是不是所有的zipObserver的队列都是有计算结果的,如果是,就将结果混合之后发射出去。
flatmap
public <R> CustomObservable<R> flatMap(CustomFunction<T, CustomObservableSource<R>> function) {
return new CustomObservableFlatMap(this, function);
}
其实flatmap跟map的区别在于,前者是将值转换成一个Observable,而后者将值转换成另外种类型的值。
public class CustomObservableFlatMap<T, R> extends CustomObservable {
private CustomObservableSource<T> source;
private CustomFunction<T, CustomObservableSource<R>> mapper;
public CustomObservableFlatMap(CustomObservableSource<T> source, CustomFunction<T, CustomObservableSource<R>> mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
protected void subscribeActual(CustomObserver observer) {
CustomFlatMapObserver<T, R> flatMapObserver = new CustomFlatMapObserver(observer, mapper);
source.subscribe(flatMapObserver);
}
private static class CustomFlatMapObserver<T, R> implements CustomObserver<T> {
private CustomObserver<R> observer;
private CustomFunction<T, CustomObservableSource<R>> mapper;
public CustomFlatMapObserver(CustomObserver<R> observer, CustomFunction<T, CustomObservableSource<R>> mapper) {
this.observer = observer;
this.mapper = mapper;
}
@Override
public void onStart() {
observer.onStart();
}
@Override
public void onNext(T t) {
CustomObservableSource<R> source = mapper.apply(t);
InnerObserver<R> innerObserver = new InnerObserver<>(observer);
source.subscribe(innerObserver);
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
private static class InnerObserver<R> implements CustomObserver<R> {
private CustomObserver<R> observer;
InnerObserver(CustomObserver<R> observer) {
this.observer = observer;
}
@Override
public void onStart() {
}
@Override
public void onNext(R result) {
observer.onNext(result);
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
}
}
}
}
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/6252.html