Akka介绍
Akka是用scala编写的actor模型框架。它在使用中不需要锁和多线程,每个Actor在独立空间中进行数据操作,Actor之间并不直接通信,而是通过了消息来相互沟通,每一个Actor都把它要做的事情都封装在了它的内部,操作都是异步进行的。
理论上来讲,每一个Actor都拥有属于自己的轻量级线程,保护它不会被系统中的其他部分影响.因此,我们在编写Actor时,就不用担心并发的问题, 通过Actor能够简化锁以及线程管理.
它可以用于高并发、分布式场景,需要注意的是,Akka消息的传递不保证绝对可靠投递,当然这带来了好处是整个实现简单.
Actor具有以下的特性
- 提供了一种高级的抽象,能够封装状态和操作.简化并发应用的开发.
- 提供了异步的非阻塞的/高性能的事件驱动模型
- 超级轻量级的线程事件处理能力.
概念认知
在了解Akka前先了解几个概念
并发&并行
并发: 指的是两个或多个任务都有进展,即使他们没有被同时执行。例如可以这样实现:划分出时间片,几个任务交叉执行,尽管时间片的执行是线性的。
并行: 指可以真正同时执行。
异步&同步
所谓同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不返回。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。
阻塞&非阻塞
阻塞调用是指调用结果返回之前,当前线程会被挂起。函数只有在得到结果之后才会返回。有人也许会把阻塞调用和同步调用等同起来,实际上他是不同的。对于同步调用来说,很多时候当前线程还是激活的,只是从逻辑上当前函数没有返回而已。
非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回
Akka模块介绍
- akka-actor:最核心的依赖包,里面实现了Actor模型的大部分东西
- akka-agent:代理/整合了Scala中的一些STM特性
- akka-camel:整合了Apache的Camel
- akka-cluster:akka集群依赖,封装了集群成员的管理和路由
- akka-kernel:akka的一个极简化的应用服务器,可以脱离项目单独运行.
- akka-osgi:对OSGI容器的支持,有akka的最基本的Bundle
- akka-remote:akka远程调用
- akka-slf4j:Akka的日志事件监听
- akka-testkit:Akka的各种测试工具
- akka-zeromq:整合ZeroMQ
其中最总要的就是akka-actor,最简单的AKKA使用的话,只需要引入这个包就可以了.
建议一个Akka系统的步骤
- 建立角色系统,ActorSystem;
- 定义角色,actor;
- 注册角色,actorSystem.actorOf;
- 消息的传递,tell;
- 在actor内消息的处理(处理完后,也可通过actorSelect路径选择其它actor,并发送消息)
相关模块说明
- 角色系统
ActorSystem
在akka中,所有角色都需在角色系统中,可以通过ActorSystem.create方法建立,同时可以指定角色系统名称,这会在actor路径中体现出来;
- 角色(Actor)
通过继承AbstractActor类,实现抽象方法createReceive,并在此方法定义消息处理模块(使用reciveBuilder.build进行构造), 接收消息(消息类型,其实也就是消息的class类型区分消息),并进行消息处理;
- 角色(Actor)注册/创建
通过ActorSystem的actorOf方法创建actor,创建时可以通过new BalancingPool(3)指定该actor的工作worker数量,以及actor的名称,这会在路径中体现出来。
- 角色(Actor)路径及查找
Akka内的各个actor采用类似于分布式文件系统(如hdfs)进行管理,actor类似于文件系统的各个文件。
Akka主要根路径有三个:/(根路径),/system(系统相关的路径),/user(用户路径,通常定义的actor在此路径下)。
有了路径,可以通过getContext().actorSelect()方法基于路径,获取到相关actor引用,进而发送消息.
/user: 守护Actor
这个名为”/user”的守护者,作为所有用户创建actor的父actor,可能是需要打交道最多的。使用system.actorOf()创建的actor都是其子actor。这意味着,当该守护者终止时,系统中所有的普通actor都将被关闭。同时也意味着,该守护者的监管策略决定了普通顶级actor是如何被监督的.
/system: 系统守护者
这个特殊的守护者被引入,是为了实现正确的关闭顺序,即日志(logging)要保持可用直到所有普通actor终止,即使日志本身也是用actor实现的。其实现方法是:系统守护者观察user守护者,并在收到Terminated消息初始化其自己的关闭过程。顶级的系统actor被监管的策略是,对收到的除ActorInitializationException和ActorKilledException之外的所有Exception无限地执行重启,这也将终止其所有子actor。所有其他Throwable被上升,然后将导致整个actor系统的关闭。
/: 根守护者
根守护者所谓“顶级”actor的祖父,它监督所有在Actor路径的顶级作用域中定义的特殊actor,使用发现任何Exception就终止子actor的SupervisorStrategy.stoppingStrategy策略。其他所有Throwable都会被上升……但是上升给谁?所有的真实actor都有一个监管者,但是根守护者没有父actor,因为它就是整个树结构的根。因此这里使用一个虚拟的ActorRef,在发现问题后立即停掉其子actor,并在根守护者完全终止之后(所有子actor递归停止),立即把actor系统的isTerminated置为true。
Akka监管策略
Akka中有两种类型的监管策略:OneForOneStrategy 和AllForOneStrategy.
两者都配置有从异常类型监管指令间的映射(见上文),并限制了一个孩子被终止之前允许失败的次数。它们之间的区别在于,前者只将所获得的指令应用在发生故障的子actor上,而后者则是应用在所有孩子上。通常情况下,你应该使用OneForOneStrategy,这也是默认的策略。
Akka消息传递机制
有三种基本类型:
- 至多一次投递的意思是对该机制下的每条消息,会被投递0或1次;更随意的说法就是,它意味着消息可能会丢失。
- 至少一次投递的意思对该机制下的每条消息,有可能为投递进行多次尝试,以使得至少有一个成功;更随意的说法就是,消息可能重复,但不会丢失。
- 恰好一次投递的意思对该机制下的每条消息,接收者会正好得到一次投递;消息既不能丢,也不会重复。
第一种是最廉价的——性能最高,实现开销最少——因为它可以用打后不管(fire-and-forget)的方式完成,不需要在发送端或传输机制中保留状态。第二种方式要求重试来对抗传输丢失,这意味着需要在发送端保持状态,并在接收端使用确认机制。第三种是最昂贵的——并因此表现最差——因为除了需要第二种方式的机制以外,还需要在接收端保持状态,以过滤重复的投递.
Akka入门示例
Maven依赖
1 | <dependency> |
定义Actor
1 | public class HelloActor extends AbstractLoggingActor { |
程序主类
1 | public class HelloMain { |
程序解读
- 创建一个Actor系统,命名为”helloSystem”
- 每个Actor对外封闭,使用ActorRef的actorOf方法创建Actor,示例创建了一个名为”helloActor”的Actor,绑定HelloActor事件处理.
- 发送消息给”helloActor”,tell()方法是异步的,它只给Actor的邮箱放一封邮件,然后就返回了。tell()方法的第一个参数是消息,第二个参数是发送者,这样接收者Actor就知道是谁给自己发的消息了. 示例中在主方法中发送消息,所以首次消息发送者为
ActorRef.noSender()
.
执行输出
1 | Hello, World! |
Akka消息传递
示例使用Akka实现一个MapReduce的经典示例WordCount
Maven依赖
1 | <dependency> |
工程结构
1 | ├── pom.xml |
消息定义
1 | public class Message { |
Actor定义
InputActor
定义
1 | public class InputActor extends AbstractLoggingActor{ |
MapperActor
定义
1 | public class MapperActor extends AbstractLoggingActor{ |
ReduceActor
定义
1 | public class ReduceActor extends AbstractLoggingActor{ |
OutputActor
定义
1 | public class OutputActor extends AbstractLoggingActor { |
WcApplication
测试主类定义
1 | public class WcApplication { |
执行输出
1 | [INFO] [01/29/2018 12:00:27.089] [wordCountSystem-akka.actor.default-dispatcher-3] [akka://wordCountSystem/user/inputActor] [事件][数据录入]: Akka Map Reduce Demo |