Android 写一个属于自己的Rxjava(二)详解手机开发

目录

Android 写一个属于自己的Rxjava(一)

Android 写一个属于自己的Rxjava(二)

前言

上一篇实现了Rxjava基本的Observable和map操作符的实现,接下来需要实现Rxjava最重要的线程切换和复杂的操作符:

  • subscribeOn()
  • observeOn()
  • from()
  • zip()
  • flatmap()

先附上github源码

subscribeOn()

subscribeOn()作用在上游的发射,先定义一个CustomScheduler,提供执行任务的接口。

public class CustomScheduler {
    
    private final Executor executor; 
 
    public CustomScheduler(Executor executor) {
    
        this.executor = executor; 
    } 
 
    public CustomWorker createWorker() {
    
        return new CustomWorker(executor); 
    } 
 
    public static class CustomWorker{
    
        private final Executor executor; 
        public CustomWorker(Executor executor) {
    
            this.executor = executor; 
        } 
 
        public void schedule(Runnable runnable) {
    
            executor.execute(runnable); 
        } 
    } 
} 

我们可以定义多种多样的CustomScheduler,指定执行在什么线程或者线程池。我们还可以造一个执行在主线程的Scheduler,就可以达到AndroidSchedulers.mainThread()一样的效果。

继续在CustomObservable中提供subscribeOn()的方法:

    // CustomObservable 
    public CustomObservable<T> subscribeOn(CustomScheduler scheduler) {
    
        return new CustomObservableSubscribeOn(this, scheduler); 
    } 

跟上篇文章一样,生成了CustomObservableSubscribeOn来封装上游和下游。CustomObservableSubscribeOn的实现也很简单,只是将上游的执行扔进CustomScheduler线程池里面执行,下游Observer不需要做什么动作。

 
class CustomObservableSubscribeOn<T> extends CustomObservable<T> {
    
    private CustomObservableSource<T> source; 
    private CustomScheduler scheduler; 
 
    public CustomObservableSubscribeOn(CustomObservableSource<T> source, CustomScheduler scheduler) {
    
        this.source = source; 
        this.scheduler = scheduler; 
    } 
 
    @Override 
    protected void subscribeActual(final CustomObserver observer) {
    
        final CustomSubscribeOnObserver subscribeOnObserver = new CustomSubscribeOnObserver(observer); 
        CustomScheduler.CustomWorker worker = scheduler.createWorker(); 
        worker.schedule(new Runnable() {
    
            @Override 
            public void run() {
    
                // 将任务执行扔进CustomScheduler 
                source.subscribe(subscribeOnObserver); 
            } 
        }); 
    } 
 
    private static final class CustomSubscribeOnObserver<T>  implements CustomObserver<T> {
    
        final CustomObserver<? super T> actual; 
 
        CustomSubscribeOnObserver(CustomObserver<? super T> actual) {
    
            this.actual = actual; 
        } 
 
        @Override 
        public void onStart() {
    
            actual.onStart(); 
        } 
 
        @Override 
        public void onNext(T t) {
    
            actual.onNext(t); 
        } 
 
        @Override 
        public void onError(Throwable error) {
    
            actual.onError(error); 
        } 
 
        @Override 
        public void onComplete() {
    
            actual.onComplete(); 
        } 
    } 
} 

ObserveOn()

其实本质跟subscribeOn是一样的,区别在于ObserveOn()作用在下游的observer中。

提供ObserverOn()方法

    // CustomObservable 
    public CustomObservable<T> observeOn(CustomScheduler scheduler) {
    
        return new CustomObservableObserveOn(this, scheduler); 
    } 

继续新建CustomObservableObserveOn类,只需要将回调事件onNext等扔进CustomScheduler的线程池就完成任务了。

class CustomObservableObserveOn<T> extends CustomObservable<T> {
    
    private CustomObservableSource<T> source; 
    private CustomScheduler scheduler; 
 
    public CustomObservableObserveOn(CustomObservableSource source, CustomScheduler scheduler) {
    
        this.source = source; 
        this.scheduler = scheduler; 
    } 
 
    @Override 
    protected void subscribeActual(CustomObserver observer) {
    
        CustomScheduler.CustomWorker worker = scheduler.createWorker(); 
        CustomObserverObserveOn observerObserveOn = new CustomObserverObserveOn<T>(observer, worker); 
        source.subscribe(observerObserveOn); 
    } 
 
    private static class CustomObserverObserveOn<T> implements CustomObserver<T> {
    
        private CustomObserver<T> observer; 
        private CustomScheduler.CustomWorker worker; 
 
        public CustomObserverObserveOn(CustomObserver<T> observer, CustomScheduler.CustomWorker worker) {
    
            this.observer = observer; 
            this.worker = worker; 
        } 
 
        @Override 
        public void onStart() {
    
            this.worker.schedule(new Runnable() {
    
                @Override 
                public void run() {
    
                    observer.onStart(); 
                } 
            }); 
        } 
 
        @Override 
        public void onNext(final T t) {
    
            this.worker.schedule(new Runnable() {
    
                @Override 
                public void run() {
    
                    observer.onNext(t); 
                } 
            }); 
        } 
 
        @Override 
        public void onError(final Throwable e) {
    
            this.worker.schedule(new Runnable() {
    
                @Override 
                public void run() {
    
                    observer.onError(e); 
                } 
            }); 
        } 
 
        @Override 
        public void onComplete() {
    
            this.worker.schedule(new Runnable() {
    
                @Override 
                public void run() {
    
                    observer.onComplete(); 
                } 
            }); 
        } 
    } 
} 

from()

rxjava用fromIterable 操作符可以逐次发射list的中的数据。

怎么简单实现一个封装多个值的Observable。其实也不难,就是执行subscribeOn()后,多次调用onNext()发射数据。

   // CustomObservable 
    public static <T> CustomObservable<T> from(Iterable<T> values) {
    
        return new CustomObservableIterable<>(values); 
    } 

继续造CustomObservableIterable

class CustomObservableIterable<T> extends CustomObservable {
    
    private Iterable<T> valueIter; 
 
    public CustomObservableIterable(Iterable<T> valueIter) {
    
        this.valueIter = valueIter; 
    } 
 
    @Override 
    protected void subscribeActual(CustomObserver observer) {
    
        CustomIterableObserver<T> iterableObserver = new CustomIterableObserver<>(valueIter, observer); 
        CustomInterableSource source = new CustomInterableSource(); 
        source.subscribe(iterableObserver); 
    } 
 
    private class CustomInterableSource implements CustomObservableSource {
    
        @Override 
        public void subscribe(CustomObserver observer) {
    
            observer.onStart(); 
            observer.onNext(null); 
            observer.onComplete(); 
        } 
    } 
 
    private static class CustomIterableObserver<T> implements CustomObserver<T> {
    
        private Iterable<T> valueIter; 
        private CustomObserver<T> observer; 
 
        CustomIterableObserver(Iterable<T> valueIter, CustomObserver<T> observer) {
    
            this.valueIter = valueIter; 
            this.observer = observer; 
        } 
 
        @Override 
        public void onStart() {
    
            this.observer.onStart(); 
        } 
 
        @Override 
        public void onNext(T t) {
    
            for (T value : valueIter) {
    
                this.observer.onNext(value); 
            } 
        } 
 
        @Override 
        public void onError(Throwable e) {
    
            this.observer.onError(e); 
        } 
 
        @Override 
        public void onComplete() {
    
            this.observer.onComplete(); 
        } 
    } 
} 

zip

网上把zip说得好复杂,每次我都没看明白,其实zip用起来很简单,就是将多个上游的发射请求执行结果混合在一起,统一发射给同一个下游observer。但是要注意的是,多个上游的是一一对应混合的。

任务A的执行的结果是[1, 2, 3]
任务B的执行的结果是[1, 2]
混合规则是加法,那么最后的结果是什么?

结果是:[2, 4]
因为B没有结果跟A的3对应,所以抛弃了A的3。

zip的实现比较复杂,同样先提供一个对外的静态方法

// CustomObservable 
public static <T, U, R> CustomObservable<R> zip(final CustomObservableSource<T> o1, 
                                                    final CustomObservableSource<U> o2, 
                                                    CustomBiFunction<T, U, R> mapper) {
    
        List<CustomObservableSource<?>> list = Arrays.asList(o1, o2); 
        CustomFunction<Object[], R> arrayFunc = new CustomFunctions.Array2Func(mapper); 
        return new CustomObservableZip(list, arrayFunc); 
    } 
 
public interface CustomBiFunction<T, U, R> {
    
    R apply(T t, U u); 
} 

我们将CustomBitFunction转换成CustomFunction<Object[], R>,更有通配性,简单理解就是表示多个CustomObservableSource转换成R结果。至于如何转换,直接看上面的github源码。

public class CustomObservableZip<T, U, R> extends CustomObservable<T> {
    
    List<CustomObservableSource<T>> sources; 
    CustomFunction<Object[], R> mapper; 
 
    public CustomObservableZip(List<CustomObservableSource<T>> sources, CustomFunction<Object[], R> mapper) {
    
        this.sources = sources; 
        this.mapper = mapper; 
    } 
 
    @Override 
    protected void subscribeActual(CustomObserver observer) {
    
        ZipCoordinator zipCoordinator = new ZipCoordinator(observer, sources, mapper); 
        zipCoordinator.subscribe(); 
    } 
 
    static final class ZipCoordinator<T, R> {
    
        CustomObserver<R> actual; 
        List<CustomObservableSource<T>> sources; 
        List<ZipObserver<T, R>> observers; 
        CustomFunction<Object[], R> mapper; 
        int size; 
        boolean isFinish; 
 
        ZipCoordinator(CustomObserver<R> observer, 
                       List<CustomObservableSource<T>> sources, 
                       CustomFunction<Object[], R> mapper) {
    
            this.actual = observer; 
            this.sources = sources; 
            this.mapper = mapper; 
            this.size = sources.size(); 
            this.observers = new ArrayList<>(size); 
            this.isFinish = false; 
        } 
 
        public void subscribe() {
    
            actual.onStart(); 
            for (int i = 0; i<size; i++) {
    
                ZipObserver observer = new ZipObserver<T, R>(this); 
                observers.add(observer); 
            } 
            for (int i = 0; i<size; i++) {
    
                sources.get(i).subscribe(observers.get(i)); 
            } 
        } 
 
        void drain() {
    
            if (isFinish) {
    
                return; 
            } 
            boolean canMerge = true; 
            boolean isDone = true; 
            for (ZipObserver<T, R> observer: observers) {
    
                if (!observer.isDone) {
    
                    isDone = false; 
                } 
                if (observer.queue.isEmpty()) {
    
                    canMerge = false; 
                } 
            } 
            if (canMerge) {
    
                List<T> mergeList = new ArrayList<>(size); 
                for (ZipObserver<T, R> observer: observers) {
    
                    T t = observer.queue.poll(); 
                    mergeList.add(t); 
                } 
                actual.onNext(mapper.apply(mergeList.toArray())); 
            } 
            if (isDone) {
    
                actual.onComplete(); 
            } 
        } 
    } 
 
    static class ZipObserver<T, R> implements CustomObserver<T> {
    
        boolean isDone; 
        ZipCoordinator<T, R> parent; 
        Queue<T> queue; 
        Throwable error; 
 
        public ZipObserver(ZipCoordinator parent) {
    
            this.parent = parent; 
            this.queue = new LinkedList<>(); 
            this.isDone = false; 
        } 
 
        @Override 
        public void onStart() {
    
 
        } 
 
        @Override 
        public void onNext(T o) {
    
            queue.add(o); 
            parent.drain(); 
        } 
 
        @Override 
        public void onError(Throwable e) {
    
            isDone = true; 
            error = e; 
            parent.drain(); 
        } 
 
        @Override 
        public void onComplete() {
    
            isDone = true; 
            parent.drain(); 
        } 
    } 
} 

事实上Rxjava的zip实现比上面复杂多一些。
简单说下我的实现方式,就是为每一个CustomObservableSource提供一个ZipObserver,内部存储着自己的计算结果,每次执行完任务调用onNext的时候,就去看下是不是所有的zipObserver的队列都是有计算结果的,如果是,就将结果混合之后发射出去。

flatmap

    public <R> CustomObservable<R> flatMap(CustomFunction<T, CustomObservableSource<R>> function) {
    
        return new CustomObservableFlatMap(this, function); 
    } 

其实flatmap跟map的区别在于,前者是将值转换成一个Observable,而后者将值转换成另外种类型的值。

public class CustomObservableFlatMap<T, R> extends CustomObservable {
    
    private CustomObservableSource<T> source; 
    private CustomFunction<T, CustomObservableSource<R>> mapper; 
 
    public CustomObservableFlatMap(CustomObservableSource<T> source, CustomFunction<T, CustomObservableSource<R>> mapper) {
    
        this.source = source; 
        this.mapper = mapper; 
    } 
 
    @Override 
    protected void subscribeActual(CustomObserver observer) {
    
        CustomFlatMapObserver<T, R> flatMapObserver = new CustomFlatMapObserver(observer, mapper); 
        source.subscribe(flatMapObserver); 
    } 
 
    private static class CustomFlatMapObserver<T, R> implements CustomObserver<T> {
    
        private CustomObserver<R> observer; 
        private CustomFunction<T, CustomObservableSource<R>> mapper; 
 
        public CustomFlatMapObserver(CustomObserver<R> observer, CustomFunction<T, CustomObservableSource<R>> mapper) {
    
            this.observer = observer; 
            this.mapper = mapper; 
        } 
 
        @Override 
        public void onStart() {
    
            observer.onStart(); 
        } 
 
        @Override 
        public void onNext(T t) {
    
            CustomObservableSource<R> source = mapper.apply(t); 
            InnerObserver<R> innerObserver = new InnerObserver<>(observer); 
            source.subscribe(innerObserver); 
        } 
 
        @Override 
        public void onError(Throwable e) {
    
            observer.onError(e); 
        } 
 
        @Override 
        public void onComplete() {
    
            observer.onComplete(); 
        } 
 
        private static class InnerObserver<R> implements CustomObserver<R> {
    
            private CustomObserver<R> observer; 
 
            InnerObserver(CustomObserver<R> observer) {
    
                this.observer = observer; 
            } 
 
            @Override 
            public void onStart() {
    
 
            } 
 
            @Override 
            public void onNext(R result) {
    
                observer.onNext(result); 
            } 
 
            @Override 
            public void onError(Throwable e) {
    
                observer.onError(e); 
            } 
 
            @Override 
            public void onComplete() {
    
 
            } 
        } 
    } 
} 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/app/6252.html

(0)
上一篇 2021年7月17日 00:44
下一篇 2021年7月17日 00:44

相关推荐

发表回复

登录后才能评论