Future Promise 并发模型

随着并发程序占据越来越多的生产比例,传统的基于线程和任务的开发方式越来越难以表达任务之间的关联关系。为了表达任务之间的状态依赖,常常需要引入某种范围的全局变量,导致程序意外的耦合或者出现难以排查的问题。简化复杂并发程序的开发的需求日渐迫切。在这样的背景下,业界结合学界的成果进行了若干种并发模型的尝试和落地,其中之一就是 Future Promise 模型。

本文首先介绍 Future Promise 模型的基本概念,然后以若干典型的实现讨论其并发模式,最后,结合其他并发模型以及相互的互操作讨论其特点和优劣势。

Future 的基本概念

通常,Future 和 Promise 两个名词在这个并发模型里是混用的,无论是 Future 还是 Promise 都指代一个用于取得计算结果的代理或者计算结果的容器。从容器的视角看,该容器初始并不包含结果,仅当计算完成时被填充结果;从代理的视角看,仅当计算完成时可以通过该代理获得其结果。

在有些实现中,这两个概念是区分的。这种情况下,Future 往往指的是一个结果的只读占位符,也就是说持有 Future 可以作为结果的占位符组合更多的计算逻辑,但不能对计算结果本身产生影响。Promise 则指的是一个可以单次写的结果的容器。

Future 的典型实现

上面提到,Future 可以作为结果的占位符组合其他计算逻辑,但是对于仅实现最简单的 Future 语义即未来完成的值的版本来说,这种组合并不轻松。

Java 1.5 版本的 Future

我们看到 Java 1.5 引入的最初版本的 Future 的定义。

public interface Future<V> {

boolean cancel(boolean mayInterruptIfRunning);

boolean isCancelled();

boolean isDone();

V get() throws InterruptedException, ExecutionException;

V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException

}

Future 取消的细节在 《线程的取消和关闭》 一文中有所涉及,主要要知道 Future 的取消是不可靠的,此外 cancel 接口可以被子类重写以完成一些状态修改和资源清理的工作。 这里不再做展开。

除此以外,可以看到,除了生命周期的布尔判断之外,这个版本的 Future 实现仅支持唯一的 get 接口来阻塞地取得结果。如果结果暂未被填充进 Future 里,则阻塞等待。

这种接口设计导致如果我们通过 Future 表示某个计算的结果,为了组合这个结果,我们总需要最终通过 get 接口取得值,并将它和另一个值运行相关的计算。但是大多数并发程序并不希望这个阻塞等待发生在当前线程,于是我们需要在另一个线程中等待线程的结果。整体代码形如下面的代码块。

public Future<R> compose(Future<T> dep, Function<T, R> fn) {

Future<Integer> result = executorService.submit(() -> {

while (!dep.isDone()) {

Thread.onSpinWait();

}

return fn.apply(dep.get());

});

return result;

}

当然,这样的代码也不复杂。 但是一旦标准库没有提供,那么每个项目当中都要由开发者实现一遍。 即使同一个团队的项目能够抽取出公共工具方法,但是世界范围内的开发者却很有可能开发出形形色色的实现,这其中很容易甚至是必然混入一些细微的不同,使得相互之间代码的交互变得复杂甚至不可能。 从代码本身来说,我们需要再次向 ExecutorService 提交一个专门用于组合计算结果的作业,同时涉及到实际的阻塞过程,这无疑向开发者暴露了太多底层的细节,引入了更多的耦合和复杂度。

可以看到,上面这样的组合形式是直观而且通用的。同时,我们指出存在不同的此类胶水工具的实现将加大开发社区代码的割裂度。在这样的背景下,社区的标杆项目和三方库自然会考虑实现一套足够坚实的 Future 组合机制。这其中早期的尝试就是 Netty 和 Guava 等项目中的 Future 接口。我们将会看到它们采用了基于回调的组合方式。

Netty 版本的 Future

Netty 中的 Future 接口定义如下。

public interface Future<V> extends java.util.concurrent.Future<V> {

boolean isSuccess();

boolean isCancellable();

Throwable cause();

Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

}

可以看到,比起 Java 的 Future 接口多出了判断计算成功失败的接口,取得计算失败原因的接口,以及注册回调的接口。

这里涉及到两个考虑。其之一,计算不仅可以成功完成返回结果,还有可能失败。在 Java 的 Future 接口中,失败的原因需要通过捕获 get 接口的 ExecutionException 异常取得。虽然理论上都可以取得失败的原因,但是代码的流畅性却被打断了。嵌套的 try-catch 会很快击垮代码的逻辑层次,泄露底层实现的细节。其之二,通过 addListener 和 removeListener 两个接口支持注册在 Future 完成时回调的逻辑,Listener 的接口如下。

public interface GenericFutureListener<F extends Future<?>> extends EventListener {

void operationComplete(F future) throws Exception;

}

其中的 operationComplete 接口将在 Future 完成时被回调,回调参数是 Future 的子类。 通常来说,我们只对结果有兴趣,因此回调的参数是成功或失败的类型,在 Scala 中是 Try 特质。 但是在 Netty 的设计中,Future 有 ChannelFuture 这个子类,从而可以从 Future 对象中取得 Channel 及其上下文进行进一步的网络操作,因此回调参数是 Future 的子类。

Guava 版本的 ListenableFuture

Guava 是 Google 开源的一个通用的 Java 类库,类似于 Apache 基金会的 Apache Commons 项目,其中自然也包含了用于并发的工具。Guava 的 Future 接口是 ListenableFuture 接口。

public interface ListenableFuture<V> extends Future<V> {

void addListener(Runnable listener, Executor executor);

}

这是一个非常通用的接口,实现了 Runnable 接口的 listener 实例会在 Future 完成之后被调用。 在这样的接口抽象下要想处理 Future 的结果只能手动捕获上下文的 Future 对象并调用 get 接口。 不同于 Netty 的实现,这里没有对计算成功完成返回结果和计算失败返回异常做出抽象,因此必须使用 try-catch 式的语法来分开处理。

当然,如前所述,大多数并发场景下我们只对 Future 的结果感兴趣,其实并没有自定义任意逻辑的需求。俗话说,让用户自定义所有逻辑,就是自己没有实现任何逻辑的懒政。Guava 作为业界领先的通用 Java 类库,当然不会对这样的情况熟视无睹。它提供了以下的静态方法来封装这部分逻辑。

public static <V> void addCallback(

final ListenableFuture<V> future,

final FutureCallback<? super V> callback,

Executor executor

) {

Preconditions.checkNotNull(callback);

future.addListener(new CallbackListener<V>(future, callback), executor);

}


public interface FutureCallback<V> {

void onSuccess(@Nullable V result);

void onFailure(Throwable t);

}

其中 CallbackListener 是一个内部类,负责实现上面提到的各种正确错误的处理。 调用方只需要实现一个流畅的接口 FutureCallback 即可,在接口中定义计算成功完成时处理结果的逻辑,以及计算失败时应对失败的逻辑。

虽然基于 Listener 的方式能够避免阻塞获取 Future 的值以及为了避免这一情况显式地创建新的异步任务,但是回调的方案有一个广为人知的问题,即所谓的回调地狱。在这里就不再举代码的例子,简单地说,为了描述复数个异步计算任务之间的依赖关系,需要在第一个 Future 的回调当中开启第二个计算,在第二个计算的的 Future 的回调中开启第三个计算,以此类推。很快,你的代码会陷入层层缩进当中,以至于迷失在嵌套的层次里面。

预计暑期的时候会讨论一下近期宣发迅猛的流式存储系统 Pravega 项目,其中就涉及到和很多 gRPC 交互以及 Future 相关的工程代码。有些处理方式还是很有典型性的,到时候再提。

话说回来,为了规避回调地狱,实际上类似于最初的阻塞等待到 Listener 回调的策略,我们并没有发明什么新的技术或思路,只是把常见的模式封装在更有表达力的流畅的接口当中,从而使得调用方能够流畅地描述自己的逻辑。

Java 1.8 版本的 ComplatableFuture

Java 1.8 版本引入了 CompletableFuture 对象,这个对象实现的 CompletionStage 接口定义了一系列的流畅的编程接口以支持灵活扩展异步计算。此外,CompletableFuture 支持手动设定计算的结果,这就实现了 Promise 写入的语义。此前,我们只能通过立即返回的 Callable 来绕过。总的来说,CompletableFuture 是一个 Future 和 Promise 语义兼具的重载对象。

CompletionStage 接口定义的方法超过 30 个,但其不同的语义却很少。

首先,它为每个不同语义的接口分别定义了三个重载方法,分别是同步方法,不传递 Executor 的异步方法和传递 Executor 的异步方法。其中,不传递 Executor 的异步方法等同于将 ForkJoinPool 的 commonPool 作为默认的 Executor 执行传递 Executor 的异步方法。异步方法好理解,将扩展的处理 Future 结果的函数在 Executor 中运行。同步方法比较微妙,通常开发者期望的是在依赖的 Future 在异步线程当中完成时延续其线程完成。实际上,如果在扩展异步计算时依赖的 Future 尚未完成,那么确实是这样的行为。但是如果依赖的 Future 已经完成,由于无法再得知它刚才是在哪个 Executor 上运行的,甚至该 Executor 已经被销毁,因此是在当前线程执行的。这有可能违反异步计算直觉导致死锁。通常建议永远使用异步方法,并尽可能指定 Executor 参数。

其次,在 Scala 中,同一个意群的扩展方法的函数参数都是唯一的。但在 Java 中,却首先按照是否有返回分为 Apply 和 Accept 两大类。另外,虽然扩展异步计算永远有上一步的依赖任务,但是或许是 Java 为了兼顾以前的 Runnable 方法的使用,还专门为它定制了 Run 一类的方法。

实际上,真正有意义的不同逻辑在 Java 中只有以下几种。

定义当前异步计算完成时对计算结果进行处理的函数,该方法签名如下。

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);

定义两个异步计算中任意一个完成时处理先完成的计算结果的函数,该方法签名如下。

public <U> CompletionStage<U> applyToEither(

CompletionStage<? extends T> other,

Function<? super T, U> fn);

定义两个异步计算均完成时组合两个结果的函数,该方法签名如下。

public <U,V> CompletionStage<V> thenCombine(

CompletionStage<? extends U> other,

BiFunction<? super T,? super U,? extends V> fn);

定义当前异步计算完成时将结果作为参数传入一个返回 CompletionStage 的函数,该方法签名如下。

public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);

这个方法是理解 Future 是 Monad 的核心,因为它就是 Monad 的 bind 方法,这是 Monad 最核心的方法。 在这里我们可以简单理解成,许多函数在定义时就是为了异步调用,因此它们的签名返回值都实现了 CompletionStage 接口。 然而,很多时候我们组合异步计算逻辑,并不关心这里面套了几层计算,而是想拿到最终的结果。 如果没有这个方法,我们将会在 thenApply 组合的时候陷入越来越深的 CompletionStage 泛型嵌套,这就回到了回调地狱之中。 通过每次组合两个异步计算时均拆封成单层的 CompletionStage 封装,我们可以任意组合异步计算而无需担心泛型嵌套层数过深。

上面定义的方法均只在异步计算成功时有效,如果扩展的当前 Future 已经失败,那么调用这个方法也只会返回同样的失败结果。为了处理失败的结果,CompletionStage 提供了以下方法。

public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

这个是专门用来处理失败结果的方法,仅此一个,甚至没有异步版本。 没有异步版本应该是个坑,但是可以看出其实核心的转换逻辑就是 Function 接口定义的有输入有输出的抽象。 上面提到的接受 Consumer 和 Runnable 类型参数的处理成功结果的方法,其实是冗余的。

除了没有异步结果,对比上面成功结果的方法,可以看到还差了一个组合传入函数返回 CompletionStage 的情形,这应当也是接口上的缺陷。Scala 中使用 recover 方法对应此处的 exceptionally 方法,recoverWith 方法对应这里提到的返回 CompletionStage 的情形。

当然,除了单独处理成功结果、单独处理失败结果的接口以外,还有支持同时处理这两种结果的接口。

public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);

可以看到,这里又根据传入函数有无返回值区分了不同的接口,这在概念上也是冗余的。 同时,也缺少了处理传入函数返回 CompletableStage 的情形。

值得一提的是,这里处理不同的情形是通过传入二元参数(结果,异常)来实现的,通过约定保证其中恰好有一个是空值(NULL)。这其实是 Go 里面异常处理的路数。这种方案会导致额外占用一个字段的空间,并且依赖约定而不是类型系统,实际上不是很坚固。

在 Scala 中,同类方法的接口如下。

def onComplete[U](f: Try[T] => U)(implicit executor: ExecutionContext): Unit

可以看到,通过组合 Union 类型 Try 特质,结合上基于模式匹配的解析,Scala 能够简洁的处理成功和失败的不同逻辑。

Future 与其他模式

Future 能够简洁的表达异步计算,尤其是引入 CompletableFuture 之后能够流畅的组合异步计算的结果处理逻辑。因此,Future 模式被广泛应用在并发编程当中。

并发编程的重度依赖领域分布式系统也在相当程度上支持了这一模式。例如,对远端方法的调用在本地可以由 Future 来表达一个结果的占位符。这样,本地进行远端方法调用之后可以拿着 Future 作为占位符继续表达剩下的计算逻辑。当远端方法调用的结果返回时,这些后续逻辑都会被一一触发。同时,作为其他并发编程模式的补充,Future 常常被用于其他单次返回结果的延迟操作,例如访问本地数据库。

Future 模式与其他并发模式的交集有几个要点。

第一个,就是这里比较的 Future 与 CPS 的差别。CPS 通过回调来组合逻辑,实际上 CPS 并不需要一个结果的占位符,它主要关注的是不同计算模块之间的先后关系。当然,Future 也非常关注这个关系,并且现代的 Future Promise 模型基本不再使用回调的实现模式,从而使得概念能够清晰的区分出来。

第二个,是 Future 和 PubSub 的关系。PubSub 的实现很多,包括各种消息队列以及 Reactive 扩展等。以 Project Reator 为例,其 Mono 抽象实际上即是 Future 的同义词。PubSub 与 Future 的核心区别有两个,其之一是 PubSub 模型有显式的消息传递通道,其之二是 PubSub 模型支持多次写入结果值,并由订阅方依次消费。由于 Future 仅支持单次完成,在某些持续交流的场景下会有比较大的局限性。还是那句话,技术上 Future 可以表达任意的异步计算逻辑,但是表达的成本是不一样的。为了绕过 Future 表示单次完成的定义,强行支持用户侧的多次完成,必然带来二次开发的成本和复杂的实现细节。如果后续有空介绍 gRPC 的实现,可以对这一部分进行详细的讨论。

第三个,是 Future 和 Actor Model 之间的关系。

Actor Model 与 Future 的关系很有意思。大致上,Actor Model 的通信原语 tell 是没有返回值的,仅仅代表了一个 actor 向另一个 actor 发送消息。在 Actor Model 的实现 Akka 中提供了 ask 的原语,它就是通过返回一个 Future 来表达 ask 的结果。技术上,发起 ask 的 Actor 会创建一个专门的子 Actor 来接受回复消息,在 Akka 中由 PromiseActorRef 担任这个职责。这个 Actor 的地址被父 Actor 传递给 ask 的对象,或者在 Akka 的 Untyped 实现中由于 sender 惯例,由 PromiseActorRef 自己发送。ask 的对象完成请求后将结果发回给 PromiseActorRef,后者填充交回父 Actor 的 Future 的结果,从而刚才返回的 Future 触发完成逻辑。这就是 ask 底下的通信细节的一个简要介绍。另一方面,Akka 实现也大量复用了 Java/Scala 的并发工具来开发,当然也有用到 Future 并发模型的地方。

如果后续有机会讲到 Actor Model 的细节,例如 Akka 的实现或者 BEAM 的实现,也许可以对这部分再做详细的展开。

最后作为逸闻,Actor Model 的提出者 Carl Hewitt 有一篇论文《The Incremental Garbage Collection of Processes》介绍了如今 Future 模型的一系列定义。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章