通过RxJava看kotlin协程(二)

Scheduler 与 Worker 在 RxJava2 中是一个非常重要的概念,他们是 RxJava 线程调度的核心与基石。Scheduler主要负责的就是一件事情,定义好每个流模块的执行线程。

源码追溯

源码分析我们先从subscribeOn方法开始。

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {    
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");   
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
复制代码

上述代码可以看出,subscribeOn的核心代码是ObservableSubscribeOn,这个类只做了一件事情,它会把上一个流用装饰者模式包装了一下,当上一个流被执行的时候会将流执行到scheduler的线程上去。

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
}
复制代码

ObservableSubscribeOn的subscribeActual方法执行之后,scheduler.scheduleDirect(new SubscribeTask(parent))通过这段代码将当前流运行到Scheduler的线程内。

之后我们可以看下Scheduler的实现累,RxAndroid的HandlerScheduler,看看对于安卓的调度器,RxJava是怎么写的。

@Overridepublic 
Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
    if (run == null) throw new NullPointerException("run == null");    
    if (unit == null) throw new NullPointerException("unit == null");    
    run = RxJavaPlugins.onSchedule(run);    
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);    
    handler.postDelayed(scheduled, unit.toMillis(delay));    
    return scheduled;
}
复制代码

scheduler.scheduleDirect执行的时候就会调用scheduleDirect方法。看得出来,方法被触发之后调用了handler的postdelay方法,将当前的Runnable运行到该handler的线程上去。

static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        @Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();

                // releasing the pool should be the last action
                pool.release(threadWorker);
            }
        }

        @Override
        public boolean isDisposed() {
            return once.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }
复制代码

接下来我们要说另外一个Worker,上面是IoScheduler里面的Worker代码。Work在初始化的时候会去生成一个线程池,这个线程池就是我们后续schedule执行的地方,当一个Runnnable被调度到这个work上的时,会调用schedule方法,然后将这个Runnnable运行到这个线程池上去。

协程 Dispatcher

释义

协程上下文(coroutine context)包含一个协程调度器(参阅 CoroutineDispatcher),协程调度器 用于确定执行协程的目标载体,即运行于哪个线程,包含一个还是多个线程。协程调度器可以将协程的执行操作限制在特定线程上,也可以将其分派到线程池中,或者让它无限制地运行。

源码追溯

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
    // compute new context
    val oldContext = uCont.context
    val newContext = oldContext + context
    // always check for cancellation of new context
    newContext.checkCompletion()
    // FAST PATH #1 -- new context is the same as the old one
    if (newContext === oldContext) {
        val coroutine = ScopeCoroutine(newContext, uCont)
        return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
    }
    // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
    // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
    if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
        val coroutine = UndispatchedCoroutine(newContext, uCont)
        // There are changes in the context, so this thread needs to be updated
        withCoroutineContext(newContext, null) {
            return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
        }
    }
    // SLOW PATH -- use new dispatcher
    val coroutine = DispatchedCoroutine(newContext, uCont)
    coroutine.initParentJob()
    block.startCoroutineCancellable(coroutine, coroutine)
    coroutine.getResult()
}
复制代码

这是协程的线程调度的代码,当发现当前的调度器和目标调度器不是同一个的情况下,会new一个DispatchedCoroutine,开始进行线程的调度操作。

coroutine.initParentJob()初始化父任务,这个方法需要一开始就被初始化调用的。 然后就是核心关键将block.startCoroutineCancellable(coroutine, coroutine),该方法会创建一个新的可挂起线程。 进行异步等待操作,当有值的情况下会回调将当前挂起结束,进行下一步获取值操作,然后将当前的线程返回。

在调用withContext方法的时候因为我们传入的是Dispatchers.Main

@JvmStatic
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
复制代码

而DispatcherMain是可以有外部的fatroy构造的,由安卓的kotlin支持库中可以发现,其实现类是AndroidDispatcherFactory。

internal class AndroidDispatcherFactory : MainDispatcherFactory {

    override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
        HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")

    override fun hintOnError(): String? = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"

    override val loadPriority: Int
        get() = Int.MAX_VALUE / 2
}
复制代码

这次真的转过来了把,没有干扰了把,各位老哥,我要给你们跪了啊。

接下来我们看下重头戏HandlerContext,这个类就是和rxjava的HandlerScheduler基本一模一样的线程调度器。

internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    /**
     * Creates [CoroutineDispatcher] for the given Android [handler].
     *
     * @param handler a handler.
     * @param name an optional name for debugging.
     */
    public constructor(
        handler: Handler,
        name: String? = null
    ) : this(handler, name, false)

    @Volatile
    private var _immediate: HandlerContext? = if (invokeImmediately) this else null

    override val immediate: HandlerContext = _immediate ?:
        HandlerContext(handler, name, true).also { _immediate = it }

    override fun isDispatchNeeded(context: CoroutineContext): Boolean {
        return !invokeImmediately || Looper.myLooper() != handler.looper
    }

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        handler.post(block)
    }

    override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
        val block = Runnable {
            with(continuation) { resumeUndispatched(Unit) }
        }
        handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
        continuation.invokeOnCancellation { handler.removeCallbacks(block) }
    }

    override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
        handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
        return object : DisposableHandle {
            override fun dispose() {
                handler.removeCallbacks(block)
            }
        }
    }

    override fun toString(): String =
        if (name != null) {
            if (invokeImmediately) "$name [immediate]" else name
        } else {
            handler.toString()
        }

    override fun equals(other: Any?): Boolean = other is HandlerContext && other.handler === handler
    override fun hashCode(): Int = System.identityHashCode(handler)
}

复制代码

DispatchedCoroutine在上面的这个挂起函数的父类CoroutineDispatcher,会调用dispatch方法,进行线程切换操作,然后是不是和上面的rxjava 有点似曾相似的感觉。

没错,各位看官,这次调用了handler.post(block)。所以我这次我真的下结论了,上篇文章是有点小微妙,但是这次应该没事清楚了。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章