『译』Kotlin Flows 与 Reactive Streams —— 青出于蓝的历史

转载请注明原文地址: juejin.im/editor/post…

原文标题: Reactive Streams and Kotlin Flows

原文作者: Roman Elizarov

原文发布于:2019-06-10

响应式扩展(简称为 ReactiveX 或者 Rx)这一概念,由 Erik Meijer 首次在 .NET 中提出,并于 2010 公之于众。它是一个新的异步数据流 API 的构建趋势,即采用包含发射元素(onNext)、流执行完成(onCompleted)、出现错误(onError)的回调,同时引入了像 map 和 filter 等的流式处理操作符,让流的数据处理变得和使用集合一样容易。

基于观察者的 Rx 提供了更优的性能比传统基于迭代的数据处理 API。此外,Rx 主张使用“冷流”这一思想。它在那时是一个十分新颖的思想,因为在当时命令式语言处于业界主流,大多数的数据处理 API 都是“热的”。在资源管理方面,“热流”有很多弊端(一旦你打开它,你必须不要忘记关闭它),然而“冷流”提供了一个优雅的解决方案(可参考文章: Cold flows, hot channels )。

当下 Rx 不断普及同时它被移植到了许多编程语言中,其中就包括 Java ,因此它就可以运用在最大的编程环境 JVM 中了。在 2013 年,Rx 移植到 Java 被称为 RxJava。于 2014 年发布了 1.0 稳定版本。

响应式流(Reactive Streams)

期间,在 JVM 平台还有另外两个项目 AkkaProject Reactor ,它们是致力于基于事件的异步系统,然而它们都面临一个共同的控制流问题 —— 背压。这样就促成了一次合作,为了提供一个标准的接口集合来解决 JVM 使用响应式数据流伴随的背压问题。它被称为“响应式流” 新方案,同时 Viktor Klang 发布了一个 重大的会谈 记录下了它的历史进程。响应式流的 1.0 版本于 2015 年发布。

响应式流 是一个令人钦佩的工程。它为 JVM 世界带来了支持背压的异步事件流,否则它很难支持异步性。它是一个纯基于库的壮举,引入大量的必须严格遵循的契约。本文不再详述这些,当你使用一些众所周知的专家构建的操作符,它就可以完美的运行,但是如果你自己去写响应式流的操作符,要遵守所有的这些契约是一个巨大的挑战。

Kotlin Flows

在 2018 年 Kotlin 编程语言发布了 协程 ,作为一个专门针对异步编程的语言普遍特征。在 Kotlin 中,“挂起”(suspension)这个概念,是一个与生俱来的流控制方案。把它与基于观察者的 Rx 的具有“冷流”思想相结合,就可以深入理解 Kotlin Flows 了。

我们致力于 Kotlin Flows 的一系列工作就是以实现一个简单的设计为目标。这个设计就是,仅仅只需要很少的基本构建单元,就可以编写你自己的操作符。比如:想要为没一个值都延迟一秒 ?没问题,使用基本的 flow 构建者,以及 collect 函数,就能实现:

fun <T> Flow<T>.delayASecond() = flow {

    collect { value -> // collect from the original flow
        delay(1000) // delay 1 second
        emit(value) // emit value to the resulting flow
    }
}
复制代码

你并不会看到显示任何有关处理背压的代码,因为它已经自动的在幕后完成了,这一切都归功于 Kotlin 编译器提供了对“挂起”(suspension)的支持。

并发性结构

从无到有的设计 Kotlin Flows,使得我们也有机会来减少之前响应式流有关的一些模板代码。比如:当订阅一个响应式流,你最终会持有一个 “订阅”(Subscription)对象的引用,你必须很小心的管理它,以便你可以取消这个订阅,否则你可能有泄漏它的风险。这个问题和 并发性结构 正在解决的问题是非常类似的,而在 Flow 的设计中你不用担心这些,你并不会因为不小心导致泄漏一个订阅。

Kotlin Flow 没有任何订阅的概念。挂起和轻量级的协程前来救援。Flow 的 collect 操作符最像一个订阅,但是它仅仅是一个挂起的函数,归功于并发性结构,只要不是滥用它,调用它是很难引发泄漏的。

collect 操作符是基于挂起的设计,因此不再需要每次都单独设置 onError 和 onCompleted 回调。(译者注:可以回想一下 Kotlin 的 suspend 函数,它的设计初衷就是不用回调,让异步代码可以像同步代码一般无二的编写。)想要在流正常完成后执行一些操作 ?那么就在 collect 正常完成后做就好了:

fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {
    // reemit all values from the original flow
    collect { value -> emit(value) }
    // this code runs only after the normal completion
    action()
}
复制代码

前事不忘后事之师

在我们设计期间,通过研究现有的响应式流应用代码,给我们带来了极大的好处。例如,我们看到涉及执行上下文切换都有共同的代码模式(无处不在的 subcribeOn / observeOn),从而仅仅采用一个简单的 flowOn 操作符,来完成了同样的机制(详情可参考: Execution context of Kotlin Flows )。

为此,在核心库中,我们还可以奢侈的不去实现所有可以想到的操作符。我们仅仅挑选了最流行和最基本的来实现,因为 Kotlin 支持扩展函数(参见: Extension-oriented design ),把它与 Flow 的简化设计相结合,可以很容易的创建自定义操作符,就如同使用内置的操作符一般无二。

集成

Kotlin Flows 仍然是响应式流的理念。尽管它们是基于挂起的而且没有直接实现相关的接口,但是它这样的设计,可以让它直截了当的与基于响应式流的系统集成起来。我们提供了开箱即可用的 flow.asPublisher() 扩展函数来把 Flow 转换为响应式流的 Publisher 接口,以及 publisher.asFlow() 扩展来实现反向转换。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章