苦涩的技术我该怎么学?Akka 实战

关注“一猿小讲”公众号的小伙伴都清楚,上次《 技术再深入一点又何妨?一脸懵B的聊Actor 》,我们在“懵 B”的状态下,聊了聊 Actor 模型的理论知识。稍微再补充两句,如上图所示在 Actor 模型系统中,主要有互不依赖的 Actor 组成(图中圆圈),Actor 之间的通信是通过消息来实现的,其中每个 Actor 都有一个 MailBox 来存储接收到的消息,每个 Actor 都维护着自己的状态。

说实话,聊 Actor 模型其实有点醉翁之意不在酒,项庄舞剑意在 Akka。

Actor 模型还有点懵 B,又出来个Akka,这又是个什么玩意儿?估计你心里一直在犯嘀咕。

好了,不闲扯,请准备好小板凳,我们的分享开始。

1.

Akka 是啥?

Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。它已经成功运用在电信行业。系统几乎不会宕机(高可用性 99.9999999 % 一年只有 31 ms 宕机)。Akka 是 JAVA 虚拟机 JVM 平台上构建高并发、分布式和容错应用的工具包和运行时。Akka 用 Scala 语言写成,同时提供了 Scala 和 JAVA 的开发接口—— 摘自百度百科。

上面是度娘的答复,那咱们尝试用自己的话回答一下,其实 Akka 是 Actor 模型的一种实现,是一个用来开发支持并发、容错、扩展性的应用程序框架,So Easy!!

2.

Akka 咋用?

好了,让我们抛开苦涩的概念,让我们写一个超级简单的 HelloWorld 了解一下 Akka 的魅力吧。

先定一下 HelloWorld 完成的目标:实现简易的WordCount,按照空格拆分一句话,并统计每个单词出现的次数。

第一步:让我们从全局认识一下,我们要用 Akka 生撸的 HelloWorld(看到截图不得不说是相当之简单)

第二步:引入依赖,考虑到方便你本机测试,遂把依赖 copy 一份给你。

<dependencies>    
    <dependency>        
        <groupId>com.typesafe.akka</groupId>        
        <artifactId>akka-actor_2.12</artifactId>        
        <version>2.6.0-M5</version>    
    </dependency>
</dependencies>复制代码

第三步:伏笔买了这么久,终于可以施展代码了。

来自于内心的疑问:Actor 与 Actor 之间通过消息进行通讯,那么用于传输的消息实体该如何定义?

来自于灵魂的碰撞 1:拆分一句话的 Actor,该如何定义?

来自于灵魂的碰撞 2:统计每个单词出现次数的 Actor,该如何定义?

来自内心 + 灵魂的发问:该如何攒到一起?

第四步:代码写完了,是该一览尊荣的时候了。

效果达到预期,杠杠滴!为了体现我是一个负责任的分享者,还是需要把全部代码分享出来,以便你快速上手(捂嘴笑)。

import akka.actor.*;
import java.io.IOException;
import java.util.*;

public class WordCountAkka {

    public static void main(String[] args) {
        //1、创建Actor系统,名字为wordcount
        ActorSystem actorSystem = ActorSystem.create("wordcount");
        try {
            //2、创建SplitActor,用于拆分每行的单词
            ActorRef splitActor = actorSystem.actorOf((Props.create(SplitActor.class)), "SplitActor");
            //2.1、创建CountActor,用于统计单词的次数
            ActorRef countActor = actorSystem.actorOf((Props.create(CountActor.class)), "CountActor");

            //3、创建消息
            //TODO 接收的消息串,可以修改为从控制台输入,本次就直接写死了
            Message msg = new Message("Hello Akka Akka Hello");
            //4、给SplitActor发消息
            splitActor.tell(msg, ActorRef.noSender());

            //5、按回车退出应用
            System.out.println(">>> Press ENTER to exit <<<");
            System.in.read();
        } catch (IOException e) {
        } finally {
            actorSystem.terminate();
        }
    }

    /**
     * 定义 SplitActor 用于拆分每行的单词
     */
    static class SplitActor extends AbstractActor {
        @Override
        public Receive createReceive() {
            return receiveBuilder().match(Message.class, t -> {
                System.out.println(self() + "  收到来自于 " + sender() + " 的消息: " + t);
                //按照空格拆分数据
                String[] words = String.valueOf(t.getContent()).toLowerCase().split("\\W+");
                //封装消息请求给CountActor
                Message msg = new Message(words);
                System.out.println(self() + "  发送消息 : " + Arrays.toString(words));
                //根据路径查找下一个处理者
                ActorSelection countActorRef = getContext().actorSelection("/user/CountActor");
                //将消息发给下一个处理者CountActor
                countActorRef.tell(msg, self());
            }).build();
        }
    }

    /**
     * 定义 CountActor 用于统计每个单词出现的次数
     */
    static class CountActor extends AbstractActor {
        @Override
        public Receive createReceive() {
            return receiveBuilder().match(Message.class, t -> {
                //收到消息
                String[] words = (String[]) t.getContent();
                System.out.println(self() + " 收到来自于 " + sender() + " 的消息: " + Arrays.toString(words));

                //统计处理
                Map conutMap = new HashMap<>();
                for (String word : words) {
                    Integer num = conutMap.get(word);
                    conutMap.put(word, num == null ? 1 : num + 1);
                }
                System.out.println(self() + " 每个单词出现次数的统计结果为 : " + conutMap);
            }).build();
        }
    }

    /**
     * 定义消息
     */
    static class Message {

        private Object content;

        public Message(Object content) {
            this.content = content;
        }

        public Object getContent() {
            return content;
        }

        public void setContent(Object content) {
            this.content = content;
        }
    }
}复制代码

其实代码中的注释,已经写的非常之清晰了,但是为了让你更清晰明了,我还是稍微再总结 Akka 的代码研发流程。

1、采用 ActorSystem.create("wordcount") 创建一个名字为 wordcount 的 Actor 系统;
2、定义 XxActor extends AbstractActor,实现 createReceive() 方法完成业务逻辑处理;
3、通过 actorSystem.actorOf((Props.create(SplitActor.class)), "SplitActor") 创建业务逻辑处理的 Actor;
4、通过 getContext().actorSelection("/user/CountActor") 选择下一个逻辑处理的 Actor;
5、采用 countActorRef.tell(msg, self()) 来发送消息。复制代码

3.

好了,结合本次的 Akka 分享 + 上次的 Actor 模型的分享,你多多少少应该对 Actor 模型有点概念了吧。相信通过这两次的分享,我们再去深入架构源码,虽谈不上平步青云,但是也会好风凭借力送你上青云(捂嘴笑)。

调皮的我又找一张 flink 运行时的架构图,你有没有发现 Actor System 担任了整个架构通讯的角色啊!!!!

4.

最后,主要想说一下,授人以鱼不如授人以渔,尝试结合个人在面对新技术时的一个研究思路,先从整体上了解个梗概,然后再逐个了解七七八八,遇到没见过的新名词、新技术不要放弃,暂时屏蔽,当时抽取 Resin 核心源码就如此,见到 Actor 字眼,当时也没深究,到现在才开始了解 Actor ,但是当时抽取的项目架构已在业务上平稳运行,所以遇到不懂的,莫阻碍全局,到秋后再(补)算账,也未尝不是一个好的方式方法。其实核心思想说简单点就是「时间紧,任务重,先出活,其它都白扯!」

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章