【译】kotlin 协程 Flow:给 RxJava 使用者的介绍

RxJava 可能是我使用的最重要的库,Rx 通常是编写代码的另一种范式, Kotlin 作为一种新的编程语言,使它可以轻松实现将协程驱动的 flow 实现为自己的 Rx 实现。 我可能在 Hello Kotlin Coroutines 中介绍了协程,这对于理解 flow 很有必要

Kotlin 具有一组扩展,以方便使用集合。 但它不是响应式的

listOf("Madara", "Kakashi", "Naruto", "Jiraya", "Itachi")
    .map { it.length }
    .filter { it > 4 }
    .forEach {
        println(it)
    }
复制代码

在此示例中,如果您深入研究 map 函数源代码,您将发现这里没有魔法,它只是列表的循环,进行了一些转换然后为您提供了一个新列表。 过滤器也一样。 这种机制称为 eager evaluation ,该函数在整个列表中进行操作并提供一个新列表。 但是如果我们不需要创建这些临时列表以节省一些内存,那我们可以使用 Sequences

listOf("Madara", "Kakashi", "Naruto", "Jiraya", "Itachi")
	// 使用 Sequence
    .asSequence()
    .map { it.length }
    .filter { it > 4 }
    .forEach {
        println(it)
    }
复制代码

这里的区别就是先调用 asSequence 方法,然后使用我们的操作,再次 查看 map 方法后,我们发现了一些不同之处,它只是 sequence 的修饰符,返回值类型也是 sequence 。 使用 sequence map 时,只能一项一项地进行操作。列表较大时, sequence 比普通集合要好得多。 sequence 可以同步完成其工作,有没有办法异步使用那些转换运算符呢?答案是 flow

flow

如果我们尝试获取列表并将其用作 flow ,并在流的末尾调用 collect {..} ,则会收到编译错误。 由于 flow 是基于协程构建的,因此默认情况下它具有异步功能,因此您可以在代码中使用协程时使用它

collect {…} 运算符,您可以将其想像为 Rxjava 中的 subscribe

流也是 cold stream ,这意味着,直到您调用操作符(如 collect)后,flow 才会被执行。 如果您重复调用 collect ,每次您将获得相同的结果

因此,Collections 扩展功能仅适用于小数据, sequence 可以节省您不必要的工作(不创建临时列表),而使用 flow,您可以用协程的强大功能来编写代码。 因此,让我们学习如何构建它

构建 flow

我们看到 asFlow 方法,它是 Collections 上的扩展函数,可将其转换为 flow,我们查看一下源码

public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}
复制代码

如果我们要编写前面的示例在数据源中添加一些逻辑,则只需使用 flow{…} 或者 flowof()

转换操作符

flow 拥有一些列的用于转换的运算符,例如 mapfiltergroupByscan 等等

在由 Coroutines 提供支持的 flow 中,您可以自然地在您的操作符中使用异步代码,假设我们想要做一些耗时的操作,这里使用延迟一秒钟表示。 使用 RxJava 时,您可以使用 flatmap

这里想表达的是 flow 具有更简单的设计,并且与以其陡峭的学习曲线而闻名的 RxJava 相比易于学习,我在此使用 flow 将它简化一下

terminal 操作符

我已经提到 collect() 是 terminal operator,当您在调用它时得到结果,在 RxJava 中,您可以通过调用 subscribe() 来启动它,或者使用阻塞的方式,调用 blockingGet

flow 中的 terminal operator 是需要作用域操作的挂起函数,其他的 operator 例如

  • toList(),toSet -> 返回集合中的所有 item

  • first() -> 仅返回第一个发射

  • reduce(),fold() -> 使用特定操作获取结果

发射数据

为了发射数据,您需要使用一个挂起函数

//fire a coroutine
someScope.launch {
  //fire flow with a terminal operator
  flowSampleData().collect { }
}
复制代码

上面的花括号让人想起了回调,您可以使用 launchIn 函数,处理结果可以使用 onEach{...}

flowSampleData()
    .onEach {
     //handle emissions
    }
    .launchIn(someScope)
复制代码

取消

每次设置 RxJava 订阅时,我们都必须取消这些订阅以避免内存泄漏或过期的任务在后台运行, RxJava 提供对订阅的引用( disposable )来取消订阅, disposable().dispose() 。如果您在 CompositeDisposable 使用了多个对象,则调用 clear()dispose()

对于 flow 使用特定 scope 的协程则可以无需进行额外的工作来达到此目的

错误处理

RxJava 最有用的功能之一就是处理错误的方式,您可以使用此 onError() 函数捕获工作流中的任何错误。 flow 有一个类似的称为 catch {…} ,如果不使用 catch {…} ,则您的代码可能会引发异常或应用崩溃。 您就可以选择使用常规 try catch 或使用 atch {…} 以声明方式进行编码

让我们模拟一个错误

private fun flowOfAnimeCharacters() = flow {
    emit("Madara")
    emit("Kakashi")
    // 抛出异常
    throw IllegalStateException()
    emit("Jiraya")
    emit("Itachi")
    emit("Naruto")
}
复制代码

使用

runBlocking {
    flowOfAnimeCharacters()
        .map { stringToLength(it) }
        .filter { it > 4 }
        .collect {
            println(it)
        }
}
复制代码

如果我们运行此代码,它将引发异常,并且如我们所说,您有两个选项可以处理错误,即常规 try-catchcatch {…} 。 这是两种情况下的修改代码

// 使用 try-catch
runBlocking {
    try {
        flowOfAnimeCharacters()
            .map { stringToLength(it) }
            .filter { it > 4 }
            .collect {
                println(it)
            }
    } catch (e: Exception) {
        println(e.stackTrace)
    } finally {
        println("Beat it")
    }
}
复制代码
// 使用 catch{}
runBlocking {
    flowOfAnimeCharacters()
        .map { stringToLength(it) }
        .filter { it > 4 }
         // catch
        .catch { println(it) }
        .collect {
            println(it)
        }
}
复制代码

使用 catch{} 需要注意的是 catch{} 操作符的放置顺序,它要放置在 terminal operator 之前,这样您才可以捕获想要的异常

恢复

如果错误中断了流,并且我们打算使用完整备份或默认数据恢复流,在 Rxjava 中使用 onErrorResumeNext()onErrorReturn() ,在 flow 中,我们还是使用 catch {…} ,但我们在其中调用了 emit() 来逐个生成备份,甚至我们可以使用 emitAll() 引入一个全新的 flow,例如如果中途出现了异常,我们需要“ Minato” 和 “ Hashirama”

runBlocking {
    flowOfAnimeCharacters()
        .catch {
            emitAll(flowOf("Minato", "Hashirama"))
        }
        .collect {
            println(it)
        }
}
复制代码

那么得到的结果是

Madara
Kakashi
Minato
Hashirama
复制代码

flowOn()

默认情况下,flow 数据源将在调用者上下文中运行,如果要更改它,例如,要使 flow 在 IO 而不是 Main 上运行,则使用 flowOn() ,并更改上游的上下文,上游是调用 flowOn 之前的全部操作符。 这是一个很好的文档示例

这里的 flowOn() 充当 RxJava 中的两个角色 [subscribeOn() — observeOn()] ,您可以编写流然后确定将在哪个上下文中进行操作

完成

当 flow 完成发射时,您可能需要执行一些操作, onCompletion {…} 可以解决这一问题,并且它确定 flow 是正常完成还是异常完成

已知数据源如下

private fun flowOfAnimeCharacters() = flow {
    emit("Madara")
    emit("Kakashi")
    throw IllegalStateException()
    emit("Jiraya")
    emit("Itachi")
    emit("Naruto")
}

复制代码

catch {…} 的工作就是捕获 IllegalStateException() 并重新开始新流程,这使我们从源头上留下“ Madara”,“ Kakashi”,在后面留下“ Minato”,“ Hashirama”。 但是 onCompletion {…} 会显示错误吗?

答案是否定的,catch 捕获了所有错误,接下来是全新的事情,请记住 onCompletion {…}catch {…} 只是中介程序运算符。 它们的顺序很重要

总结

您可以使用 Flow builders 构建 flow,其中最基本的是 flow{…} 。 如果要开始该 flow,请调用诸如 collect {…} 之类的 terminal operator ,并且由于 terminal operator 是挂起函数,因此需要使用协程构建器 launch {…} 的作用域,或者如果您想要以优雅的风格进行操作, 您可以结合使用 launchIn()onEach {…} 。 使用 catch {…} 捕获上游错误,并根据需要提供回退流程。 onCompletion {..} 将在上游完成所有发射之后或发生错误时触发。 默认情况下,所有这些方法都适用于调用程序协程上下文,如果要更改上游上下文,请使用 flowOn()

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章