RxJava源码分析之线程调度(二)

By - Last updated: 星期日, 八月 6, 2017
作者 TripleZ 2017.08.04 16:13* 字数 942

在上一篇文章当中我们把RxJava的上游线程切换的源码都大致梳理了一遍,如果还没有看的请猛戳这里,但是光有上游的线程切换是不足以让我们完成在实际项目中的应用的,绝大多数时候我们都需要在下游进行线程的切换来处理上游在其他线程中得到的结果。所以现在我们就来分析一下RxJava源码中是如何实现对下游线程的切换控制管理的。

这里我们一切换到Android主线程为例:

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

现在就从observeOn(AndroidSchedulers.mainThread())入手,探探究竟。
首先我们来看一下RxJava是如何得到一个Android主线程的Scheduler的即HandlerScheduler。我们点进源码看一下:

/** Android-specific Schedulers. */
public final class AndroidSchedulers {

    private static final class MainHolder {
        //创建一个Handle拿到主线程的Looper 创建默认的 HandlerScheduler
        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    //该Callable默认返回的就是上面的HandleScheduler
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        //这里就是入口 可以看到其实该方法是直接获取到了一个静态的Scheduler常量。
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    /** A {@link Scheduler} which executes actions on {@code looper}. */
    public static Scheduler from(Looper looper) {
        if (looper == null) throw new NullPointerException("looper == null");
        return new HandlerScheduler(new Handler(looper));
    }

    private AndroidSchedulers() {
        throw new AssertionError("No instances.");
    }
}

好了现在Scheduler有了,我们继续分析observeOn方法。

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

看到了吧,RxJava所有的代码基本都是一致的,桥接模式,这里看到是创建了一个ObservableObserveOn对象,当然第二个参数默认是false,表明了如果执行了onError() 将会重新发送一遍上游的事件序列,第三个参数是缓存的大小默认是128。我们点进ObservableObserveOn的构造方法看看里面都做了什么,很关键。

//可以看到套路基本都是一样的, ObservableObserveOn<T> 同样是继承于AbstractObservableWithUpstream<T, T> ,用来保存上游的原事件流。
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
//订阅的真正发生之处
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {//肯定不是这个Scheduler啊,我们这里是HandleScheduler
            source.subscribe(observer);
        } else {
            //创建HandlerScheduler的Worker,HandlerWorker.
            Scheduler.Worker w = scheduler.createWorker();
            //上游事件和下游事件产生订阅,这里又是一个包装类ObserveOnObserver包装了下游真正的Observer。
           //我们到ObserverOnObserver里面去看看,其是一个静态内部类
          //这里是把worker,delayError,bufferSizew也都传了进去
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
  //实现了Runnable接口,继承于BasicIntQueueDisposable
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> actual;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;

        SimpleQueue<T> queue;

        Disposable s;

        Throwable error;
        volatile boolean done;

        volatile boolean cancelled;

        int sourceMode;

        boolean outputFused;

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) s;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        actual.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        actual.onSubscribe(this);
                        return;
                    }
                }
              //事件的缓存队列 确定了缓存队列的大小
                queue = new SpscLinkedArrayQueue<T>(bufferSize);
              //执行真正的onSubscribe方法
                actual.onSubscribe(this);
            }
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
         //开始调度    
        schedule();
        }

        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
             //开始调度 
            schedule();
        }

        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;//已经完成
            //开始调度  
            schedule();
        }
      //取消订阅
        @Override
        public void dispose() {
            if (!cancelled) {
                cancelled = true;
                s.dispose();
                worker.dispose();
                if (getAndIncrement() == 0) {
                    queue.clear();
                }
            }
        }
      //判断是否被取消订阅
        @Override
        public boolean isDisposed() {
            return cancelled;
        }
      //执行调度的方法 
        void schedule() {
            if (getAndIncrement() == 0) {
              //传入当前ObserveOnObserver对象,其实现了Runnable接口
                worker.schedule(this);
            }
        }
      
        void drainNormal() {
            int missed = 1;
            //缓存数据的队列
            final SimpleQueue<T> q = queue;
            //实际下游的Observer
            final Observer<? super T> a = actual;

            for (;;) {
                //检测事件是否被终止,如果终止了直接跳出循环
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    //标记事件是否完成
                    boolean d = done;
                    T v;

                    try {
                        //拿到队列里的第一个事件
                        v = q.poll();
                    } catch (Throwable ex) {
                        //发生异常了 做一系列的后续动作
                        //取消订阅,队列的制空,发送异常事件,取消线程调度,最后跳出循环
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    //判断事件是否为空
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }
                    //为空直接进入下一轮循环
                    //因为上游的事件处理也是需要时间的,上游的执行有可能是非常大量的数据所以可能会出现缓存队列里面暂时没有事件,所以这里需要一直进行循环去等待新的事件产生
                    if (empty) {
                        break;
                    }
                    //发送事件
                    a.onNext(v);
                }
              //下面这段代码我也不是很确定他的意思,这里我说一下我自己的理解不知道正不正确:
              //因为ObserveOnObserver是继承于BasicIntQueueDisposable ,而BasicIntQueueDisposable 又继承了AtomicInteger,一个原子操作类
            //用一个Integer整数来控制当前ObserveOnObserver对象的并发操作
            //如果当前ObserveOnObserver对象没有被其他线程独占,那么该对象就自己持有的话(代表已经执行完了当前的事件),就可以执行addAndGet(int i)方法了。
            //执行完改方法对自己的负数相加那么最终得出的是0,为0的话就可以开始下一个循环了,那么以后的每一个循环missed的值都为0都可以直接break!
            //最终要的是addAndGet()是一个阻塞式的方法,如果不成功的话,它会重新执行一遍
          //所以我分析得出这里其实是一个控制标记位“好了!现在轮到你了,开始吧”当第一次拿到权限后就可以一直执行下去了。
        
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

        void drainFused() {
   ...........
        }
        //具体的run方法内部
        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                //去处理队列里面缓存的数据
                drainNormal();
            }
        }
        //检查是否终止  代码都很简单 我就不做注释了
        boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
            if (cancelled) {
                queue.clear();
                return true;
            }
            if (d) {
                Throwable e = error;
                if (delayError) {
                    if (empty) {
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        worker.dispose();
                        return true;
                    }
                } else {
                    if (e != null) {
                        queue.clear();
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else
                    if (empty) {
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
            return false;
        }
    }
}

同样是装饰模式,关键就是每当执行onNext(),onError(),onCompleted()方法的时候,都会开启线程的调度,上游的每一次事件,都会在指定线程中处理,这就是核心。然后就执行了具体的Worker实现类里面的schedule方法,我们一起看一下。

//HandlerWorker里面的schedule方法,其第二个参数为0L,第三个参数为TimeUnit.NANOSECONDS。
 @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                //判断是否取消订阅了
                return Disposables.disposed();
            }
            //满篇飞的Hook函数 +_+
            run = RxJavaPlugins.onSchedule(run);
            //封装当前持有主线程Looper的handler和ObserveOnObserver对象
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            //创建Message
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
            //给主线程发送消息
            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            //判断是否取消订阅了
            if (disposed) {
            //如果取消订阅了 就remove掉消息处理的回调接口
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }

当然了最后主线程的执行的程序是ScheduledRunnable里面的run()方法,代码如下:

 @Override
        public void run() {
            try {
              //ObserveOnObserver对象的run方法
                delegate.run();
            } catch (Throwable t) {
                //捕获异常了进行一系列处理
                IllegalStateException ie =
                    new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
                RxJavaPlugins.onError(ie);
                Thread thread = Thread.currentThread();
                thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
            }
        }

这样RxJava就实现了把上游发送的每一个事件都巧妙地转换到了指定线程中处理,此处是Android主线程。
可以看到如果你在下游多次调用observeon()的话线程是会一直切换的,这也是网上一直说的结论。每一次切换线程,都会把对应的Observer对象的各项处理方法的处理执行在制定线程当中。
大概浏览完源码你会发现,RxJava的设计者真的是把面向对象的思想用到了极致,抽象接口与实体,设计模式地巧用都无处不在,感叹自己要学的真的还有太多,如果让我来写不知道还要多少年才能写出如此牛B的代码。
这也算是我第一次写源码分析的文章,还有很多地方有待提高,最开始听说别人源码分析很重要,不光要会用那些优秀的Library更要理解其中的精髓,与是我傻乎乎地闷着脑袋去看,结果真的看不懂,后来看了一本书叫做《Android源码设计模式》才恍然大悟,设计模式地巧用在各大优秀的开源Library中无处不在,只有真正地理解了设计模式,精通架构,才能写出如此优秀的代码。最后再安利一本书《设计模式之禅》这本书很有意思,作者语言幽默风趣,像看连环画一样很有意思。
哈哈 废话说了一大堆了,如果上面我的分析有误的话,欢迎指正批评,有什么不懂得地方也可以一起探讨。

最后

没有最后了,大家再见~~~

发表在 其他 • Tags: , , , ↑Top 文章来源