Akka介绍与入门示例

Akka介绍

Akka是用scala编写的actor模型框架。它在使用中不需要锁和多线程,每个Actor在独立空间中进行数据操作,Actor之间并不直接通信,而是通过了消息来相互沟通,每一个Actor都把它要做的事情都封装在了它的内部,操作都是异步进行的。
理论上来讲,每一个Actor都拥有属于自己的轻量级线程,保护它不会被系统中的其他部分影响.因此,我们在编写Actor时,就不用担心并发的问题, 通过Actor能够简化锁以及线程管理.
它可以用于高并发、分布式场景,需要注意的是,Akka消息的传递不保证绝对可靠投递,当然这带来了好处是整个实现简单.

Actor具有以下的特性

  • 提供了一种高级的抽象,能够封装状态和操作.简化并发应用的开发.
  • 提供了异步的非阻塞的/高性能的事件驱动模型
  • 超级轻量级的线程事件处理能力.

概念认知

在了解Akka前先了解几个概念

并发&并行

并发: 指的是两个或多个任务都有进展,即使他们没有被同时执行。例如可以这样实现:划分出时间片,几个任务交叉执行,尽管时间片的执行是线性的。
并行: 指可以真正同时执行。

异步&同步

所谓同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不返回。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。

阻塞&非阻塞

阻塞调用是指调用结果返回之前,当前线程会被挂起。函数只有在得到结果之后才会返回。有人也许会把阻塞调用和同步调用等同起来,实际上他是不同的。对于同步调用来说,很多时候当前线程还是激活的,只是从逻辑上当前函数没有返回而已。
非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回

Akka模块介绍

  • akka-actor:最核心的依赖包,里面实现了Actor模型的大部分东西
  • akka-agent:代理/整合了Scala中的一些STM特性
  • akka-camel:整合了Apache的Camel
  • akka-cluster:akka集群依赖,封装了集群成员的管理和路由
  • akka-kernel:akka的一个极简化的应用服务器,可以脱离项目单独运行.
  • akka-osgi:对OSGI容器的支持,有akka的最基本的Bundle
  • akka-remote:akka远程调用
  • akka-slf4j:Akka的日志事件监听
  • akka-testkit:Akka的各种测试工具
  • akka-zeromq:整合ZeroMQ

其中最总要的就是akka-actor,最简单的AKKA使用的话,只需要引入这个包就可以了.

建议一个Akka系统的步骤

  1. 建立角色系统,ActorSystem;
  2. 定义角色,actor;
  3. 注册角色,actorSystem.actorOf;
  4. 消息的传递,tell;
  5. 在actor内消息的处理(处理完后,也可通过actorSelect路径选择其它actor,并发送消息)

相关模块说明

  • 角色系统ActorSystem

在akka中,所有角色都需在角色系统中,可以通过ActorSystem.create方法建立,同时可以指定角色系统名称,这会在actor路径中体现出来;

  • 角色(Actor)

通过继承AbstractActor类,实现抽象方法createReceive,并在此方法定义消息处理模块(使用reciveBuilder.build进行构造), 接收消息(消息类型,其实也就是消息的class类型区分消息),并进行消息处理;

  • 角色(Actor)注册/创建

通过ActorSystem的actorOf方法创建actor,创建时可以通过new BalancingPool(3)指定该actor的工作worker数量,以及actor的名称,这会在路径中体现出来。

  • 角色(Actor)路径及查找

Akka内的各个actor采用类似于分布式文件系统(如hdfs)进行管理,actor类似于文件系统的各个文件。

Akka主要根路径有三个:/(根路径),/system(系统相关的路径),/user(用户路径,通常定义的actor在此路径下)。

有了路径,可以通过getContext().actorSelect()方法基于路径,获取到相关actor引用,进而发送消息.

/user: 守护Actor

这个名为”/user”的守护者,作为所有用户创建actor的父actor,可能是需要打交道最多的。使用system.actorOf()创建的actor都是其子actor。这意味着,当该守护者终止时,系统中所有的普通actor都将被关闭。同时也意味着,该守护者的监管策略决定了普通顶级actor是如何被监督的.

/system: 系统守护者

这个特殊的守护者被引入,是为了实现正确的关闭顺序,即日志(logging)要保持可用直到所有普通actor终止,即使日志本身也是用actor实现的。其实现方法是:系统守护者观察user守护者,并在收到Terminated消息初始化其自己的关闭过程。顶级的系统actor被监管的策略是,对收到的除ActorInitializationException和ActorKilledException之外的所有Exception无限地执行重启,这也将终止其所有子actor。所有其他Throwable被上升,然后将导致整个actor系统的关闭。

/: 根守护者

根守护者所谓“顶级”actor的祖父,它监督所有在Actor路径的顶级作用域中定义的特殊actor,使用发现任何Exception就终止子actor的SupervisorStrategy.stoppingStrategy策略。其他所有Throwable都会被上升……但是上升给谁?所有的真实actor都有一个监管者,但是根守护者没有父actor,因为它就是整个树结构的根。因此这里使用一个虚拟的ActorRef,在发现问题后立即停掉其子actor,并在根守护者完全终止之后(所有子actor递归停止),立即把actor系统的isTerminated置为true。

Akka监管策略

Akka中有两种类型的监管策略:OneForOneStrategy 和AllForOneStrategy.

两者都配置有从异常类型监管指令间的映射(见上文),并限制了一个孩子被终止之前允许失败的次数。它们之间的区别在于,前者只将所获得的指令应用在发生故障的子actor上,而后者则是应用在所有孩子上。通常情况下,你应该使用OneForOneStrategy,这也是默认的策略。

Akka消息传递机制

有三种基本类型:

  • 至多一次投递的意思是对该机制下的每条消息,会被投递0或1次;更随意的说法就是,它意味着消息可能会丢失。
  • 至少一次投递的意思对该机制下的每条消息,有可能为投递进行多次尝试,以使得至少有一个成功;更随意的说法就是,消息可能重复,但不会丢失。
  • 恰好一次投递的意思对该机制下的每条消息,接收者会正好得到一次投递;消息既不能丢,也不会重复。

第一种是最廉价的——性能最高,实现开销最少——因为它可以用打后不管(fire-and-forget)的方式完成,不需要在发送端或传输机制中保留状态。第二种方式要求重试来对抗传输丢失,这意味着需要在发送端保持状态,并在接收端使用确认机制。第三种是最昂贵的——并因此表现最差——因为除了需要第二种方式的机制以外,还需要在接收端保持状态,以过滤重复的投递.

Akka入门示例

Maven依赖

1
2
3
4
5
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.5.9</version>
</dependency>

定义Actor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class HelloActor extends AbstractLoggingActor {

@Override
public Receive createReceive() {
return receiveBuilder()
.match(Number.class, m -> {
log().info("[ACCEPT][String]:{}", m);
System.out.println(m);
})
.match(String.class, StringUtils::isNotBlank, m -> {
log().info("[Accept][Number]:{}", m);
System.out.println(m);
})
.build();
}

}

程序主类

1
2
3
4
5
6
7
8
9
10
11
12
13
public class HelloMain {

public static void main(String[] args) {
// 1
ActorSystem system = ActorSystem.create("helloSystem");
// 2
ActorRef helloActor = system.actorOf(Props.create(HelloActor.class), "helloActor");
// 3
helloActor.tell("Hello, World!", ActorRef.noSender());
helloActor.tell(123, ActorRef.noSender());
helloActor.tell("Have Fun with Akka!", ActorRef.noSender());
}
}

程序解读

  1. 创建一个Actor系统,命名为”helloSystem”
  2. 每个Actor对外封闭,使用ActorRef的actorOf方法创建Actor,示例创建了一个名为”helloActor”的Actor,绑定HelloActor事件处理.
  3. 发送消息给”helloActor”,tell()方法是异步的,它只给Actor的邮箱放一封邮件,然后就返回了。tell()方法的第一个参数是消息,第二个参数是发送者,这样接收者Actor就知道是谁给自己发的消息了. 示例中在主方法中发送消息,所以首次消息发送者为ActorRef.noSender().

执行输出

1
2
3
4
5
6
Hello, World!
123
Have Fun with Akka!
[INFO] [01/26/2018 15:33:06.870] [helloSystem-akka.actor.default-dispatcher-5] [akka://helloSystem/user/helloActor] [Accept][Number]:Hello, World!
[INFO] [01/26/2018 15:33:06.871] [helloSystem-akka.actor.default-dispatcher-5] [akka://helloSystem/user/helloActor] [ACCEPT][String]:123
[INFO] [01/26/2018 15:33:06.871] [helloSystem-akka.actor.default-dispatcher-5] [akka://helloSystem/user/helloActor] [Accept][Number]:Have Fun with Akka!

Akka消息传递

示例使用Akka实现一个MapReduce的经典示例WordCount

Maven依赖

1
2
3
4
5
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.5.9</version>
</dependency>

工程结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
├── pom.xml
├── src
│   └── main
│   ├── java
│   │   └── com
│   │   └── elons
│   │   └── wordcount
│   │   └── akka
│   │   ├── actor
│   │   │   ├── InputActor.java
│   │   │   ├── MapperActor.java
│   │   │   ├── OutputActor.java
│   │   │   └── ReduceActor.java
│   │   ├── main
│   │   │   └── WcApplication.java
│   │   └── trans
│   │   └── Message.java
│   └── resources
│   └── application.properties

消息定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class Message {

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static final class Count implements Serializable{
private String word;
private Integer count;

@Override
public String toString() {
return JSON.toJSONString(this);
}
}

@Data
@Builder
@NoArgsConstructor
public static final class Mapper implements Serializable{

private List<Count> data;

public Mapper(List<Count> data) {
this.data = Collections.unmodifiableList(data);
}

@Override
public String toString() {
return JSON.toJSONString(this);
}
}

@Data
@Builder
@NoArgsConstructor
public static final class Reduce implements Serializable{

private Map<String, Integer> data;

public Reduce(Map<String, Integer> reduceDataList) {
this.data = Collections.unmodifiableMap(reduceDataList);
}

@Override
public String toString() {
return JSON.toJSONString(this);
}
}


public static final class Result implements Serializable{

}
}

Actor定义

InputActor定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class InputActor extends AbstractLoggingActor{

// 链式逆向织入

private ActorRef wcOutputActor = context().system()
.actorOf(Props.create(OutputActor.class), "outputActor");

private ActorRef reduceActor = context().system()
.actorOf(Props.create(ReduceActor.class, wcOutputActor), "reduceActor");

private ActorRef mapperActor = context().system()
.actorOf(Props.create(MapperActor.class, reduceActor), "mapperActor");

@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, x -> {
log().info("[事件][数据录入]: {}", x);
mapperActor.tell(x, self());
})
.match(Message.Result.class, x -> {
log().info("[事件][结果统计]");
wcOutputActor.tell(x, self());
})
.build();
}
}

MapperActor定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class MapperActor extends AbstractLoggingActor{

private ActorRef reduceActor = null;

public MapperActor(ActorRef reduceActor) {
this.reduceActor = reduceActor;
}

@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, x -> {
Message.Mapper mapper = mapper(x);
reduceActor.tell(mapper, self());
})
.build();
}

private Message.Mapper mapper(String line) {
List<Message.Count> list = Lists.newArrayList();
StringTokenizer parser = new StringTokenizer(line);
while (parser.hasMoreTokens()) {
String word = parser.nextToken().toLowerCase();
if (StringUtils.isNotBlank(word)) {
list.add(new Message.Count(word, 1));
}
}
return new Message.Mapper(list);
}
}

ReduceActor定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ReduceActor extends AbstractLoggingActor{

/**
* 结果集统计Actor
*/
private ActorRef wcOutputActor;

public ReduceActor(ActorRef wcOutputActor){
this.wcOutputActor = wcOutputActor;
}

@Override
public AbstractActor.Receive createReceive() {
return receiveBuilder()
.match(Message.Mapper.class, x -> {
log().info("[计算][Mapper]: {}", x);
// reduce the incoming data
Message.Reduce reduceData = reduce(x.getData());
// forward the result to aggregate actor
wcOutputActor.tell(reduceData, self());
})
.build();
}

private Message.Reduce reduce(List<Message.Count> coll) {
HashMap<String, Integer> reducedMap = Maps.newHashMap();
for (Message.Count count: coll) {
if (reducedMap.containsKey(count.getWord())) {
Integer value = reducedMap.get(count.getWord());
value++;
reducedMap.put(count.getWord(), value);
} else {
reducedMap.put(count.getWord(), 1);
}
}
return new Message.Reduce(reducedMap);
}

}

OutputActor定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class OutputActor extends AbstractLoggingActor {

private Map<String, Integer> wcCount = Maps.newHashMap();

@Override
public Receive createReceive() {
return receiveBuilder()
.match(Message.Reduce.class, x -> {
log().info("[计算][Reduce]: {}", x);
Map<String, Integer> map = x.getData();
countReduce(map);
})
.match(Message.Result.class, x -> {
System.out.println("结果:" + wcCount);
})
.build();
}

private void countReduce(Map<String, Integer> reduce) {
Integer count = null;
for (String key : reduce.keySet()) {
if (wcCount.containsKey(key)) {
count = wcCount.get(key) + reduce.get(key) ;
wcCount.put(key, count);
} else {
wcCount.put(key, reduce.get(key));
}
}
}
}

WcApplication测试主类定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class WcApplication {

public static void main(String[] args) throws Exception {

ActorSystem system = ActorSystem.create("wordCountSystem");
ActorRef inputActor = system.actorOf(Props.create(InputActor.class),"inputActor");

// 推送待统计数据
inputActor.tell("Akka Map Reduce Demo", ActorRef.noSender());
inputActor.tell("Word Count With Akka , Map Reduce Test.", ActorRef.noSender());
Thread.sleep(500);

// 获取计算结果
inputActor.tell(new Message.Result(), null);
Thread.sleep(500);

system.terminate();
}
}

执行输出

1
2
3
4
5
6
7
8
[INFO] [01/29/2018 12:00:27.089] [wordCountSystem-akka.actor.default-dispatcher-3] [akka://wordCountSystem/user/inputActor] [事件][数据录入]: Akka Map Reduce Demo
[INFO] [01/29/2018 12:00:27.089] [wordCountSystem-akka.actor.default-dispatcher-3] [akka://wordCountSystem/user/inputActor] [事件][数据录入]: Word Count With Akka , Map Reduce Test.
[INFO] [01/29/2018 12:00:27.224] [wordCountSystem-akka.actor.default-dispatcher-3] [akka://wordCountSystem/user/reduceActor] [计算][Mapper]: {"data":[{"count":1,"word":"akka"},{"count":1,"word":"map"},{"count":1,"word":"reduce"},{"count":1,"word":"demo"}]}
[INFO] [01/29/2018 12:00:27.224] [wordCountSystem-akka.actor.default-dispatcher-3] [akka://wordCountSystem/user/reduceActor] [计算][Mapper]: {"data":[{"count":1,"word":"word"},{"count":1,"word":"count"},{"count":1,"word":"with"},{"count":1,"word":"akka"},{"count":1,"word":","},{"count":1,"word":"map"},{"count":1,"word":"reduce"},{"count":1,"word":"test."}]}
[INFO] [01/29/2018 12:00:27.228] [wordCountSystem-akka.actor.default-dispatcher-5] [akka://wordCountSystem/user/outputActor] [计算][Reduce]: {"data":{"reduce":1,"akka":1,"map":1,"demo":1}}
[INFO] [01/29/2018 12:00:27.229] [wordCountSystem-akka.actor.default-dispatcher-5] [akka://wordCountSystem/user/outputActor] [计算][Reduce]: {"data":{"reduce":1,"with":1,"count":1,",":1,"word":1,"akka":1,"map":1,"test.":1}}
结果:{reduce=2, with=1, count=1, ,=1, akka=2, map=2, demo=1, word=1, test.=1}
[INFO] [01/29/2018 12:00:27.583] [wordCountSystem-akka.actor.default-dispatcher-4] [akka://wordCountSystem/user/inputActor] [事件][结果统计]

相关文档