Kotlin 基础 | 为什么要这样用协程?

看上去连续的一段代码,执行起来却走走停停,不同的子代码段还可能执行在不同的线程上。协程就是用这种方式来实现 异步

最开始,在没有协程和各种异步工具时,只能这样实现异步:

// 构建主线程 Handler
val mainHandler = Handler(Looper.getMainLooper())
// 启动新线程
val handlerThread = HandlerThread("user")
handlerThread.start()
// 构建新线程 Handler
val handler = Handler(handlerThread.looper)
// 把"拉取用户信息"通过 Handler 发送到新线程执行
handler.post(object : Runnable {
    override fun run() {
        val user = fetchUser() //执行在新线程
        // 把用户信息通过 Handler 发送到主线程执行
        mainHandler.post(object : Runnable {
            override fun run() {
                tvName.text = user.name //执行在主线程
            }
        })
    }
})
Log.v("test", "after post") // 会立刻打印(主线程不被阻塞)

fun fetchUser(): User {
    Thread.sleep(1000) //模拟网络请求
    return User("taylor", 20, 0)
}
复制代码

这段代码从网络获取用户数据并显示在控件上。

代码的不同部分会执行在不同线程上:拉取用户信息的耗时操作会在 handlerThread 线程中执行,而界面显示逻辑在主线程。

这两个线程间步调不同(异步),即互不等待对方执行完毕再执行自己的后续代码(不阻塞)。它们通过进程间互发消息实现了异步。

这样写的缺点是在同一层次中暴露太多细节!构建并启动线程的细节、线程切换的细节、线程通信的细节、网络请求的细节。 这些本该被隐藏的细节统统在业务层被铺开。

若改用 RxJava 就可以屏蔽这些细节:

userApi.fetchUser()
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeOn(Schedulers.io())
        .subscribe(
            { user ->
                tvName.text = user.name
            },
            { error ->
                Log.e("error","no user")
            }
        )
复制代码

RxJava 帮我们切换到 IO 线程做网路请求,再切换回主线程展示界面。线程间通信方式也从发消息变为回调。代码可读性瞬间提升。

若需求改成“获取用户信息后再根据用户 ID 获取其消费流水”,就得使用 flatMap() 将两个请求串联起来,此时不可避免地出现嵌套回调,代码可读性下降。

若用协程,就可以像写同步代码一样写异步代码:

launch()

class TestActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.text)

        // 启动顶层协程
        GlobalScope.launch {
            // 拉取用户信息(挂起点)
            val user = fetchUser()
            // 拉取用户账单(挂起点)
            val bills = fetchBill(user)
            // 展示用户账单(UI操作)
            showBill(bills)
        }
        Log.v("test", "after launch") // 立刻打印(主线程不被阻塞)
    }

    // 挂起方法
    suspend fun fetchUser(): User {
        delay(1000) // 模拟网络请求
        return User("taylor", 20, 0)
    }

    // 挂起方法
    suspend fun fetchBill(user: User): List<Bill> {
        delay(2000) // 模拟网络请求
        return mutableListOf(Bill("Tmall", 10), Bill("JD", 20))
    }
}
复制代码

GlobalScope.launch() 启动了一个协程,主线程不会被阻塞(“after launch”会立即打印)。其中 GlobalScopeCoroutineScope 的一个实现。

CoroutineScope 称为 协程领域,它是协程中最顶层的概念,所有的协程都直接或间接的依附于它 ,它用于描述协程的归属,定义如下:

// 协程领域
public interface CoroutineScope {
    // 协程上下文
    public val coroutineContext: CoroutineContext
}

// 协程领域的静态实现:顶层领域
public object GlobalScope : CoroutineScope {
    // 空上下文
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}
复制代码

协程领域 持有 CoroutineContext

CoroutineContext 称为 协程上下文 ,它是“和协程执行相关的 一系列 元素的集合”,其中最重要的两个是 CoroutineDispatcher (描述协程代码分发到哪个线程执行)和 Job (代表着协程本身)。

协程领域 有一个静态实现 GlobalScope ,它用于创建顶层协程,即其生命周期同 App 一致。

协程的启动方法被定义成 CoroutineScope 的扩展方法:

/**
 * 启动一个新协程,它的执行不会阻塞当前线程。默认情况下,协程会被立即执行。
 *
 * @param context 在原有协程上下文基础上附加的上下文
 * @param start 协程启动选项
 * @param block 协程体,它会在协程上下文指定的线程中执行
 **/
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,// 默认为空上下文
    start: CoroutineStart = CoroutineStart.DEFAULT, // 默认启动选项
    block: suspend CoroutineScope.() -> Unit
): Job {
    ...
}
复制代码

启动协程时,必须提供参数 block (协程体),即在协程中执行的代码段。

Demo 在协程体中先后调用了两个带 suspend 的方法。

suspend 方法称为 挂起方法 。挂起的对象是其所在协程,即协程体的执行被暂停。被暂停的执行点称为 挂起点 ,执行挂起点之后的代码称为 恢复

Demo 中有两个挂起点:在用户信息不返回之前,拉取账单就不会被执行,在拉取账单不返回之前,就不会把数据填充到列表中。

withContext()

执行下 Demo,看看效果:

android.view.ViewRootImpl$CalledFromWrongThreadException: 
    Only the original thread that created a view hierarchy can touch its views.
复制代码

崩溃原因是“展示账单逻辑被执行在非UI线程”。 GlobalScope.launch() 将协程体调度到新线程执行,执行完耗时操作后,UI 展示时还需要调度回主线程:

GlobalScope.launch {
    val user = fetchUser()
    val bills = fetchBill(user)
    withContext(Dispatchers.Main) {
        showBill(bills)
    }
}
复制代码

withContext() 是一个顶层挂起方法:

public suspend fun <T> withContext(
    context: CoroutineContext,// 指定 block 被调度到哪个线程执行
    block: suspend CoroutineScope.() ->  // 被调度执行的代码段
): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
    ...
}
复制代码

它用于在协程中切换上下文(切换协程体执行的线程)。 withContext() 会挂起当前协程(它是一个挂起点),直到 block 执行完,协程才会在自己原先的线程上恢复执行后续代码。

async()

上面的例子是两个串行请求,如果换成“等待两个并行请求的结果”,可以这样写:

GlobalScope.launch {
    val a = async { fetchA() }
    val b = async { fetchB() }
    a.await() // 挂起点
    b.await() // 挂起点
    Log.v("test","result=${a+b}")// 当两个网络请求都返回后才会打印
}

suspend fun fetchA(): String {
    ...// 网络请求
}

suspend fun fetchB(): String {
    ...// 网络请求
}
复制代码

在顶层协程中又调用 async() 启动了2个子协程:

// 启动协程,并返回协程执行结果
public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    ...
}
复制代码

aync() 也是 CoroutineScope 的扩展方法,和 launch() 唯一的不同是它引入了泛型,用于描述协程体执行的结果,并将其包装成一个 Deferred 作为返回值:

public interface Deferred<out T> : Job {
    // 挂起方法: 等待值的计算,但不会阻塞当前线程,计算完成后恢复当前协程执行
    public suspend fun await(): T
}
复制代码

调用 async() 启动子协程不会挂起外层协程,而是立即返回一个 Deferred 对象,直到调用 Deferred.await() ,协程的执行才会被挂起。当协程在多个 Deferred 对象上被挂起时,只有当它们都恢复后,协程才继续执行。这样就实现了“等待多个并行的异步结果”。

coroutineScope()

如果多个并行的异步操作没有返回值,如何等待它们都执行完毕?

GlobalScope.launch {
    // 挂起外层协程
    coroutineScope { // 和外层协程体执行在同一个线程中
        launch { updateCache() }
        launch { insertDb() }
    }
    Log.v("test", "after coroutineScope()") // 被coroutineScope阻塞,等其执行完毕才打印
}

suspend fun updateCache() {
    ...// 更新内存缓存
}

suspend fun insertDb() {
    ...// 插入数据库
}
复制代码

coroutineScope() 创建了一个协程并阻塞当前协程,在其中调用 launch() 创建了2个子协程,只有当2个子协程都执行完毕后才会打印 log。

coroutineScope() 声明如下:

public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R {
    ...
}
复制代码

coroutineScope() 有如下特点:

  1. 返回计算结果

  2. 阻塞当前协程

  3. 执行在和父协程相同的线程中

  4. 它等待所有子协程执行完毕

coroutineScope() 是 withContext() 的一种情况,当给 withContext() 传入当前协程上下文时,它和 coroutineScope() 一模一样。它也会返回计算结果,也会阻塞当前线程,也会等待所有子协程执行完毕。

换句话说, coroutineScope() 是 不进行线程调度的 withContext()

GlobalScope的罪过

虽然上面这些代码都是正确的,但它们不该出现在真实项目中。

因为它们都使用 GlobalScope.launch() 来启动协程。这样做会让管理协程变得困难:

GlobalScope.launch() 构建的协程是独立的,它不隶属于任何 CoroutineScope 。而且是静态的,所以生命周期和 APP 一致。

一旦被创建则要等到 APP 关闭时才会释放线程资源。若在短生命周期的业务界面使用,需纯手动管理生命周期,不能享受 structured-concurrency

structured-concurrency 是一种并发编程范式,它是管理多线程并发执行生命周期的一种方式,它要求“执行单元”的孵化要有结构性,即新建的“执行单元”必须依附于一个更大的“执行单元”。这样就便于管理(同步)所有执行单元的生命周期。

Kotlin 协程实现了该范式,具体表现为:

  1. CoroutineScope
    Job
    CoroutineScope
    Job
    
  2. Job 被结束时,所有子 Job 立马被结束(即使还未执行完)。
  3. Job 会等待所有子协程都结束了才结束自己。
  4. Job
    Job
    Job
    Job
    

先看一个手动管理协程生命周期的例子:如果一个 Activity 所有的协程都通过 GlobalScope.launch() 启动,那在 Activity 退出时,该如何取消这些协程?

办法还是有的,只要在每次启动协程时保存其 Job 的引用,然后在 Activity.onDestroy() 时遍历所有 Job 并逐个取消:

class TestActivity : AppCompatActivity(){
    // 持有该界面中所有启动协程的引用
    private var jobs = mutableListOf<Job>()
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        // 启动顶层协程并保存其引用
        GlobalScope.launch {
            ...
        }.also { jobs.add(it) }
    }
    
    override fun onMessageReceive(msg: Message) {
        // 启动顶层协程并保存其引用
        GlobalScope.launch {
            ...
        }.also { jobs.add(it) }
    }
    
    override fun onDestroy() {
        super.onDestroy()
        // 将所有协程都取消以释放资源
        jobs.forEach { it.cancel() }
    }
}
复制代码

每一个 GlobalScope.launch() 都是独立的,且它不隶属于任何一个 CoroutineScope 。为了管理它们就必须持有每个启动协程的引用,并逐个手动释放资源。

若使用 structured-concurrency 范式就可以让管理变简单:

class TestActivity : AppCompatActivity(), CoroutineScope by MainScope() {{
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        launch { ... }
    }
    
    override fun onMessageReceive(msg: Message) {
        launch { ... }
    }
    
    override fun onDestroy() {
        super.onDestroy()
        cancel() 
    }
}
复制代码

Activity 实现了 CoroutineScope 接口并将其委托给 MainScope() :

public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
复制代码

MainScope() 是一个顶层方法,它新建了一个 ContextScope 实例,并为其指定上下文,其中一个是 Dispatchers.Main ,它是系统预定义的主线程调度器,这意味着, MainScope 中启动的协程体都会被调度到主线程执行。

launch()cancel() 都是 CoroutineScope 的扩展方法,而 Activity 实现了该接口并委托给 MainScope 。所以 Demo 中通过 launch() 启动的协程都隶属于 MainScopeonDestroy 中调用的 cancel() 取消了 MainScopeJob ,它的所有子 Job 也一同被取消。

Activity 被创建的时 CoroutineScope 同时被实例化,在 Activity 被销毁时,所有的协程也被销毁,实现了 协程和生命周期对象绑定。 再也不用担心后台任务完成后更新界面时,因 Activity 已销毁报空指针了。

协程可以和任何具有生命周期的对象绑定,比如 View,只有当 View 依附于界面时其对应的协程任务才有意义,所以当它与界面解绑时应该取消协程:

// 为 Job 扩展方法
fun Job.autoDispose(view: View) {
    // 判断传入 View 是否依附于界面
    val isAttached = Build.VERSION.SDK_INT >= Build.VERSION_CODES.KITKAT && view.isAttachedToWindow || view.windowToken != null
    // 如果 View 已脱离界面则直接取消对应协程
    if (!isAttached) {
        cancel()
    }

    // 构建 View 和界面绑定关系监听器
    val listener = object : View.OnAttachStateChangeListener {
        // 当 View 和界面解绑时,取消协程
        override fun onViewDetachedFromWindow(v: View?) {
            cancel()
            v?.removeOnAttachStateChangeListener(this)
        }

        override fun onViewAttachedToWindow(v: View?) = Unit
    }

    // 为 View 设置监听器
    view.addOnAttachStateChangeListener(listener)
    // 当协程执行完毕时,移除监听器
    invokeOnCompletion {
        view.removeOnAttachStateChangeListener(listener)
    }
}
复制代码

然后就可以像这样使用:

launch {
    // 加载图片
}.autoDispose(imageView)
复制代码

GlobalScope 无法和任何生命周期对象绑定(除 App 生命周期),除了这个缺点外,还有一个:

coroutineScope {
    GlobalScope.launch {
        queryA()
    }
    GlobalScope.launch {
        queryB()
    }
}
复制代码

queryB() 抛出异常时, queryA() 不会被取消。因为它们是通过 GlobalScope.launch() 启动的,它们是独立的,不隶属于外层 coroutineScope

但若换成下面这种方式, queryA() 就会被取消:

coroutineScope {
    launch {
        queryA()
    }
    launch {
        queryB()
    }
}
复制代码

因为这里的 launch() 都是外层 coroutineScope 对象上的调用,所以它们都隶属于该对象。当子协程抛出异常时,父协程会受到通知并取消掉所有其他子协程。

viewModelScope

上一节的代码虽然是正确的,但依然不该出现在真实项目中。因为 Activity 属于 View层 ,只应该包含和 UI 相关的代码,启动协程执行异步操作这样的细节不该在这层暴露。(架构相关的详细讨论可以点击 我是怎么把业务代码越写越复杂的 | MVP - MVVM - Clean Architecture

真实项目中,协程更有可能在 ViewModel 层出现。只要引入 ViewModel Kotlin 版本的包就可以轻松地在 ViewModel 访问到 CoroutineScope

implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:2.2.0-alpha03"
复制代码
class MainViewModel : ViewModel() {
    fun fetchBean() {
        // 系统为 ViewModel 预定义的 CoroutineScope
        viewModelScope.launch { 
            ...
        }
    }
}
复制代码

viewModelScope 被定义成 ViewModel 的扩展属性,这种扩展手法颇为巧妙,限于篇幅原因,准备单独写一篇详细分析。

这篇仅粗略地介绍了协程相关的概念、协程的使用方式,及注意事项。依然留了很多疑惑,比如:

  1. 为啥要设定 CoroutineScope 这个角色?启动协程为啥要定义成 CoroutineScope 的扩展函数?
  2. CoroutineContext 的内部结构是怎么样的?为啥要这样设计?
  3. 协程是如何将协程体中的代码调度到不同线程执行的?

  4. 协程是如何在挂起点恢复执行的?

下一篇将更加深入阅读源码,解答这些疑问。

推荐阅读

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章