Akka Typed系列:协议&行为 原 荐

引言

2019年11月6号LightBend公司发布了AKKA 2.6版本,带来了类型安全的actor,新的Akka Cluster底层通信设施——Artery,带来了更好的稳定性,使用Jackson进行消息序列化,支持SLF4J日志接口。Akka Typed与之前的经典actor编程模式有较大的不同,本文翻译自Manuel Bernhardt——Akka技术推广大使,在2019年7月发布的系列文章: Tour of Akka Typed: Protocols and Behaviors ,文中的示例代码原是scala,考虑到scala普及程度不高,译文全部转成java代码。

本系列课程我们一起来探索Akka Typed,新的Akka Actor API显著优于经典的Actor API。其实Akka Typed早在4月份就已经可以用于生产环境了,但是API还是被标记为可能会改变,随着2.6正式版发布日期的临近,抢先看一下带来了哪些新的变化。 如果你对之前的Akka不熟悉,不用担心,保证你能看懂;如果你对Akka很熟悉,也不要飘飘然,本课程可以帮助你在实际工作中更好的掌握Akka Typed。

为什么使用Akka Typed

actor编程模型是一个强有力的抽象模型,尤其擅长解决真实世界建模,容错、并发、分布式系统问题。actor抽象编程模型构建于在互相独立的actor之间发送消息的基础之上,actor可以创建子actor,并负责监管,当子actor出现错误的时候可以重启或者重新创建,这套容错机制给整个actor系统带来了自愈能力。

经典的Akka actor API非常简单,就是一组接受并处理消息的函数

package puffin;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;

// 继承AbstractActor即可使用Actor API
public class OrderProcessor extends AbstractActor {
    @Override
    public Receive createReceive() {
        return createReceive().onMessage(OrderProcessor.ProcessOrder.class, order -> {
            ActorRef connection = context().actorOf(BankConnection.props(order.bankIdentifier));
            connection.tell(BankConnection.ExecuteOrder(order));
        }).build();
    }
}

这种编程模型和API在多线程环境中具有显著的优势,每个actor顺序处理接收到的消息,actor的内部状态也只有它本身可以修改,这比并发的修改共享状态容易多了。

天下没有免费的午餐,actor编程模型也有它的缺点,槽点在这篇文章中有提到: Akka anti-patterns series

这些年来我在一些稍微大一些的Akka工程中见到的最大的问题是actor系统随着业务越做越大,并且非常难以扩展。根本原因是这套Akka API没有强制用户采用“协议优先”的规范。实际上Akka官方教程里最先讲述的就是清晰的定义组件之间的通信协议(也就是消息),并使用全路径访问消息。已上面的例子来说, OrderProcessor 的通信协议定义如下:

// scala中的message定义多使用伴生对象
// java中通常使用static类来定义message
// 集群环境中messages需要支持序列化,如采用protobuf定义
public interface Command{}

class ProcessOrder implements Command {
    BankId bank;
    AccountId fromAccount;
    AccountId toAccount;
    Amount amount;
}

即便你遵照Akka最佳实践,但还是无法保证给actor发送一些它不支持消息,actor的 receive 方法会接受任意类型的消息,当它收到不支持的消息时,便自动转给 unhandled 方法,此方法默认只会打日志记录一下(需要正确的配置日志打印机制),这对新人来说太坑了,你找不到任何错误,但是系统就是无法正常工作。

更深层次的原因在于缺少一种机制来帮助我们维护actor之间的通信协议。随着消息类型增多,很容易忘记这些actor都支持什么类型的消息。通过单元测试和严格的日志级别会有助于缓解这种问题(只要接受到不支持的消息就打warn日志),但是仍然无法完全避免。

Akka Typed就是为了解决这个问题,新的API是为“协议优先”设计的,在实现功能之前,你必须花一点时间想一想每一个actor要处理哪些消息。经典的Actor API的最佳实践也是如此,但却是可选的,你需要在实现的过程中使要处理消息条理清晰。

看过许多真实的Akka System分享之后,有一点必须强调一下:开发Akka Typed的目的不仅仅是为了以结构化的方式组织消息以及防止丢失那一点点actor不支持的消息,它的主要目的是引导我们优先考虑系统设计。设计一组恰到好处的actor,适当的通信粒度,正确的消息模式,这样就可以构建一个强大的系统,但是它的核心却非常简单,就像高考一样简单。但是我见到太多过度设计,大家倾向于设计过多的actor以及消息,引入了不必要的复杂度,最后尾大不掉,Martin Thompson曾经这样评价微软的WSL:

WSL越做越好,一个双用途的机器呼之欲出。

Loving how Windows Subsystem for Linux (WSL) keeps getting better. A dual purpose machine is almost there.

构建一个处理支付业务的系统

本系列教程以支付系统为例进行讲解,这个领域的业务知识永远不会过时,而且我刚好在这方面有很多经验,Akka也非常适合构建高吞吐,低延迟的交易系统。

我们的支付系统将支持多种支付方式:各种信用卡(Visa、MasterCard)以及Apple Pay、PayPal、支付宝、微信等你能想到的都给它整上。每种支付形式都会有不同的校验逻辑,比如重复支付等问题。

为了支撑多种更多样的支付方式,我们的系统切分为一下几个部分,便于动态添加新的支付方式:

  1. API 系统入口,负责认证,接收多种格式的请求,转发给对应的输出组件。本教程会采用非常简单的实现。

  2. Payment Handler 系统核心,根据请求参数从配置组件获取具体的处理器,并控制整个支付流程,如验证、执行等。

  3. Configuration 存储API用户和可用的支付方式关系(契约)

  4. Payment processors 它们负责处理具体的支付逻辑,真实系统中会包含很多支付方式,在这里我们以简单的信用卡支付为例,它通常会需要调用其它组件或者第三方系统才能完成支付逻辑,但为了简单起见,我们不考虑这些外部依赖。

真实的系统中可能会有更多的关注点,比如支付方法的注册逻辑,但是就学习Akka Typed而言,上面列举的业务知识已经足够了。

Akka Typed 定义协议

前面我们已经讲过使用Akka Typed可以非常容易的定义协议,但什么是“协议”呢?协议仅仅是“消息”吗?简单来说协议就是:定义一组消息,在两个及以上的组件之间按特定的顺序和组合传递。常见的协议有TCP、HTTPS等,而我们定义的是应用层的协议。你可以认为协议就是增强版的API:API只定义了个体之间的调用格式(参数、请求内容、响应内容等),协议描述了怎么通过组件之间的相互调用使系统到达期望的状态。

在Akka Typed API中,协议由一组消息class和对应类型的actor组成。下面的例子展示了从configuration组件获取配置数据的协议:

import akka.actor.typed.ActorRef;

public interface ConfigurationMessage {
    class RetrieveConfiguration implements ConfigurationMessage {
        public MerchantId merchantId;
        public ActorRef<ConfigurationResponse> replyTo;

        public RetrieveConfiguration(MerchantId merchantId, ActorRef<ConfigurationResponse> replyTo) {
            this.merchantId = merchantId;
            this.replyTo = replyTo;
        }
    }
}

public interface ConfigurationResponse {
    class ConfigurationFound implements ConfigurationResponse {
        public MerchantId merchantId;
        public MerchantConfiguration merchantConfiguration;

        public ConfigurationFound(MerchantId merchantId, MerchantConfiguration merchantConfiguration) {
            this.merchantId = merchantId;
            this.merchantConfiguration = merchantConfiguration;
        }
    }

    class ConfigurationNotFound implements ConfigurationResponse {
        public MerchantId merchantId;
        public ConfigurationNotFound(MerchantId merchantId) {
            this.merchantId = merchantId;
        }
    }
}


public class MerchantId {
    public String id;
    public MerchantId(String id) {
        this.id = id;
    }
}

public class BankIdentifier {
    public String id;
    public BankIdentifier(String id) {
        this.id = id;
    }
}

public class MerchantConfiguration {
    public BankIdentifier bankIdentifier;
    public MerchantConfiguration(BankIdentifier bankIdentifier) {
        this.bankIdentifier = bankIdentifier;
    }
}

这个例子遵循了请求-响应的消息设计模式,欲知更多详情,请参见本书: Reactive Design Patterns

如果你以前用过经典的Actor API,你会发现这里的实现方式有两个不同的地方,第一个是消息发送者的引用包含在消息的定义中,经典的Actor API是通过Akka提供的 sender() 方法来获取发送者的。第二个是消息class中包含的 ActorRef 是有类型的,发送者使用它的时候就可以清楚的知道应该发送什么类型的消息。我们使用接口 ConfigurationResponse 定义了配置数据的返回格式,它有两个实现类,这样发送者就可以发送两种格式的消息。

看了Actor的定义之后,就能理解为什么Akka Typed比经典的Actor更容易且更安全的解决协议问题, Configuration 的定义如下:

import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;

public class Configuration extends AbstractBehavior<ConfigurationMessage> {

    private Configuration(ActorContext<ConfigurationMessage> context) {
        super(context);
    }
    // ...
}

我们定义的actor继承 AbstractBehavior ,并带有指定的类型,它只能处理 ConfigurationMessage 类型的消息,编译器可以帮助我们检查消息的发送者发送的消息是否正确。

上面的例子中我们使用面向对象的编程方式定义了Actor,稍后我们会展示函数式编程风格。

完成第一个强类型的actor

Configuration 提供查询功能:根据商户Id查询支付方式。我们继续使用面向对象的编程方式,如果使用过经典的Akka API,你对这种使用方式应该非常熟悉。

继承 AbstractBehavior 就必须实现 onMessage 方法,它返回一个 Behavior

import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;

import java.util.HashMap;
import java.util.Map;

// AbstractBehavior 是面向对象风格的切入点
public class Configuration extends AbstractBehavior<ConfigurationMessage> {

    public static Behavior<ConfigurationMessage> create() {
        return Behaviors.setup(context -> new Configuration(context));
    }

    private Configuration(ActorContext<ConfigurationMessage> context) {
        super(context);
    }

    // 存储商户Id和支付方式的配置信息
    private Map<MerchantId, MerchantConfiguration> configurations = new HashMap<>();

    @Override
    public Receive<ConfigurationMessage> createReceive() {
        // 最后返回下次接受消息对应的行为
        // 这里简单的返回当前行为即可
        return newReceiveBuilder().onMessage(ConfigurationMessage.RetrieveConfiguration.class, retrieveConfiguration -> {
            MerchantId id = retrieveConfiguration.merchantId;
            MerchantConfiguration configuration = configurations.get(id);
            if (configuration != null) {
                // 使用异步通知的方式发送配置数据给请求者
                retrieveConfiguration.replyTo.tell(new ConfigurationResponse.ConfigurationFound(id, configuration));
            } else {
                retrieveConfiguration.replyTo.tell(new ConfigurationResponse.ConfigurationNotFound(id));
            }
            // 最后返回下次接收消息对应的行为
            // 这里简单的返回当前行为即可
            return this;
        }).build();
    }
}

这个actor与我们在本文开头使用经典的actor API定义的actor非常相似:覆盖 onMessage 方法,并根据指定的消息类型做出对应的响应。

不同点在于 onMessage 对应的方法返回的是一个 Behavior ,一个actor接收到消息之后的行为包含如下3个步骤:

  1. 发送一条或多条消息给其他的actor
  2. 创建子acotr
  3. 返回一个新的行为,准备接收下一个消息

在Akka Typed API中,一个 Behavior 即代表了处理当前消息的行为,也表明了如何处理下一个消息——通过返回一个新的 Behavior 。也可以只是返回当前行为(就像上面的例子一样),因为使用面向对象风格的actor继承自 AbstractBehavior ,它本身就是一个 Behavior ,所以可以使用 return this

本系列教程后面会讨论更多关于 Behavior 的用法,使用Akka Typed API定义的actor的一个优点就是非常容易组合和测试。

Typed Akka TestKit 可以帮助你轻而易举的对actor进行测试:

import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
import org.junit.ClassRule;
import org.junit.Test;
import puffin.Configuration;
import puffin.ConfigurationMessage;
import puffin.ConfigurationResponse;
import puffin.MerchantId;

import static junit.framework.TestCase.assertEquals;

public class ConfigurationTest {

    @ClassRule
    public static final TestKitJunitResource testKit = new TestKitJunitResource();

    @Test
    public void test() {
        // 定义一个测试探针
        TestProbe<ConfigurationResponse> testProbe = testKit.createTestProbe();
        ActorRef<ConfigurationMessage> configurationActor = testKit.spawn(Configuration.create());
        MerchantId unknownMerchantId = new MerchantId("unknown");
        // 发送一条测试消息,发送者为测试探针
        configurationActor.tell(new ConfigurationMessage.RetrieveConfiguration(unknownMerchantId, testProbe.getRef()));
        ConfigurationResponse.ConfigurationNotFound response = testProbe.expectMessageClass(ConfigurationResponse.ConfigurationNotFound.class);
        assertEquals(response.merchantId, unknownMerchantId);
    }
}

acotr的监管

Actor System为actor提供运行环境、分配资源、基础设施。在这个系统中,每一个actor都有一个父actor,最顶层的actor叫做根节点( root ),使用 / 代表,它的两个直接子actor是 /user/system/user 用于在用户空间创建子actor, /system 属于akka系统内部管理,所以我们创建的所有的actor都从属于 /user

Akka Typed与经典的Actor API有一个非常重要的不同点: /user 的处理逻辑。在经典的Akka API中,Akka提供的 /user actor负责监管一切;但是Akka Typed把这个权力交给了用户。也就是说应用程序的开发者在实现actor的时候同时也必须多考虑一下actor都会有哪些行为。

在创建 Configuration actor的时候,我们大可以直接把它传给 ActorSystem 并把它作为监管者,但当创建更多actor的时候,这些actor全部都由 Configuration actor监管就不合适了。而且在actor模型中父监管机制采用级联的方式处理actor失败的问题:父actor负责决定如何处理子actor(当它抛异常的时候),因此如何对actor分组直接影响了监管策略。同样的我们应该使用一个专用的父actor做为监管actor,由它来决定如何处理子actor的失败问题。Akka Typed API中默认的监管策略是停止失败的子actor(经典的Akka API是重启)。由我们指定监管actor可以开发更灵活的监管策略,根据不同的异常做出相应的决策。综上所述我们决定使用 PaymentProcessor actor做为所有actor的监管者,actor层级如下图所示:

PaymentProcessor 的功能目前非常简单,启动的时候创建一个子actor—— Configuration ,它是无状态的,也不接收任何消息,这次我们使用函数式编程的风格,无需继承任何接口,只需要返回一个 Behavior

import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.Behaviors;

public class PaymentProcessor {
    public static Behavior<Void> create() {
        return Behaviors.setup(context -> {
            context.getLog().info("Typed Payment Processor started");
            context.spawn(Configuration.create(), "config");
            return Behaviors.empty();
        });
    }
}

Behaviors.setup() 方法是创建 Behavior 的入口,该方法包含一个 ActorContext 变量,我们用它打日志,记录actor已经启动,并使用 spawn() 方法创建了一个 Configuration actor,第一个参数用于创建actor,第二个参数是actor的名字,它在actor路径中是 /user/config

因为 PaymentProcessor 不处理任何消息,所以这里使用了 Behavior<Void>

Configuration actor使用静态的create函数创建 Behavior

public static Behavior<ConfigurationMessage> create() {
        return Behaviors.setup(context -> new Configuration(context));
    }

现在万事俱备,只欠东风,需要启动 ActorSystem 来创建我们的监管actor。Akka提供了静态方法用来创建监管actor:

import akka.actor.typed.ActorSystem;

public class Main {
    public static void main(String[] args) {
        ActorSystem<Void> actorSystem = ActorSystem.create(PaymentProcessor.create(), "typed-payment-processor");
    }
}

搞定!现在运行 Main 方法,就可以看到 PaymentProcessor 启动了:

[2019-11-24 18:24:41,269] [INFO] [puffin.PaymentProcessor] [typed-payment-processor-akka.actor.default-dispatcher-3] [akka://typed-payment-processor/user] - Typed Payment Processor started

欲知后事如何,且听下回分解。

Akka2.6前后比较

每篇文章的最后,我们都会有一个小表格对比经典的Akka API和Akka Typed API的不同之处,借助你对经典Akka API的理解可以更快的掌握Akka Typed API。

经典的Akka API Akka Typed API
ActorRef ActorRef<T>
extends Actor extends AbstractBehavior<T> (面向对象风格)
context.actorOf context.spawn()
Akka提供监管actor 用户自定义一个 Behavior 传给 ActorSystem 作为监管actor
默认的监管策略:重启失败的actor 默认的监管策略:停止子actor
我来评几句
登录后评论

已发表评论数()

相关站点

热门文章