Akka调度器和远程调用

在没有为Actor作配置的情况下,每一个ActorSystem将有一个缺省的派发器。该缺省派发器可以被配置,默认是使用指定的default-executor的一个Dispatcher

Akka Dispatcher应用

配置文件

application.conf 内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
DispatcherExample{
defaultDispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 2.0
parallelism-max = 6
}
}
defaultDispatcher1 {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 1
core-pool-size-factor = 2.0
core-pool-size-max = 6
}
throughput = 2
}
}

事件处理Actor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class EchoActor extends AbstractLoggingActor{

private static AtomicInteger instance = new AtomicInteger(0);

@Override
public void preStart() {
log().info("实例 #" + instance.incrementAndGet() + ", Hashcode #" + this.hashCode());
}

@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, x->{
log().info("[接收消息]:{}, Actor:{}, Thread:{}", x, self().path() , Thread.currentThread().getName());
})
.build();
}

}

主类测试示例

主类示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class DispatcherMain {

public static void main(String[] args) throws Exception {

ActorSystem system = ActorSystem.create("dispatcherSystem", ConfigFactory.load().getConfig("DispatcherExample"));
Props props = Props.create(EchoActor.class).withDispatcher("defaultDispatcher").withRouter(new RoundRobinPool(3));

ActorRef actor = system.actorOf(props, "echoActor");
for (int i = 0; i < 10; i++) {
actor.tell("消息:" + i, ActorRef.noSender());
}

Thread.sleep(1000);
system.terminate();
}

}

测试示例输出

1
2
3
4
5
6
7
8
9
10
11
12
13
[INFO] [01/29/2018 17:57:12.527] [dispatcherSystem-defaultDispatcher-7] [akka://dispatcherSystem/user/echoActor/$b] 实例 #3, Hashcode #2058102743
[INFO] [01/29/2018 17:57:12.527] [dispatcherSystem-defaultDispatcher-6] [akka://dispatcherSystem/user/echoActor/$a] 实例 #2, Hashcode #1700945207
[INFO] [01/29/2018 17:57:12.529] [dispatcherSystem-defaultDispatcher-8] [akka://dispatcherSystem/user/echoActor/$c] 实例 #1, Hashcode #1494635147
[INFO] [01/29/2018 17:57:12.530] [dispatcherSystem-defaultDispatcher-6] [akka://dispatcherSystem/user/echoActor/$a] [接收消息]:消息:0, Actor:akka://dispatcherSystem/user/echoActor/$a, Thread:dispatcherSystem-defaultDispatcher-6
[INFO] [01/29/2018 17:57:12.530] [dispatcherSystem-defaultDispatcher-7] [akka://dispatcherSystem/user/echoActor/$b] [接收消息]:消息:1, Actor:akka://dispatcherSystem/user/echoActor/$b, Thread:dispatcherSystem-defaultDispatcher-7
[INFO] [01/29/2018 17:57:12.530] [dispatcherSystem-defaultDispatcher-8] [akka://dispatcherSystem/user/echoActor/$c] [接收消息]:消息:2, Actor:akka://dispatcherSystem/user/echoActor/$c, Thread:dispatcherSystem-defaultDispatcher-8
[INFO] [01/29/2018 17:57:12.530] [dispatcherSystem-defaultDispatcher-6] [akka://dispatcherSystem/user/echoActor/$a] [接收消息]:消息:3, Actor:akka://dispatcherSystem/user/echoActor/$a, Thread:dispatcherSystem-defaultDispatcher-6
[INFO] [01/29/2018 17:57:12.530] [dispatcherSystem-defaultDispatcher-7] [akka://dispatcherSystem/user/echoActor/$b] [接收消息]:消息:4, Actor:akka://dispatcherSystem/user/echoActor/$b, Thread:dispatcherSystem-defaultDispatcher-7
[INFO] [01/29/2018 17:57:12.530] [dispatcherSystem-defaultDispatcher-8] [akka://dispatcherSystem/user/echoActor/$c] [接收消息]:消息:5, Actor:akka://dispatcherSystem/user/echoActor/$c, Thread:dispatcherSystem-defaultDispatcher-8
[INFO] [01/29/2018 17:57:12.530] [dispatcherSystem-defaultDispatcher-6] [akka://dispatcherSystem/user/echoActor/$a] [接收消息]:消息:6, Actor:akka://dispatcherSystem/user/echoActor/$a, Thread:dispatcherSystem-defaultDispatcher-6
[INFO] [01/29/2018 17:57:12.530] [dispatcherSystem-defaultDispatcher-7] [akka://dispatcherSystem/user/echoActor/$b] [接收消息]:消息:7, Actor:akka://dispatcherSystem/user/echoActor/$b, Thread:dispatcherSystem-defaultDispatcher-7
[INFO] [01/29/2018 17:57:12.530] [dispatcherSystem-defaultDispatcher-8] [akka://dispatcherSystem/user/echoActor/$c] [接收消息]:消息:8, Actor:akka://dispatcherSystem/user/echoActor/$c, Thread:dispatcherSystem-defaultDispatcher-8
[INFO] [01/29/2018 17:57:12.530] [dispatcherSystem-defaultDispatcher-6] [akka://dispatcherSystem/user/echoActor/$a] [接收消息]:消息:9, Actor:akka://dispatcherSystem/user/echoActor/$a, Thread:dispatcherSystem-defaultDispatcher-6

切换Dispatcher测试

主类代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class DispatcherMain {

public static void main(String[] args) throws Exception {
ActorSystem system = ActorSystem.create(
"default-dispatcher",
ConfigFactory.load().getConfig("DispatcherExample"));

ActorRef actor = system.actorOf(
Props.create(EchoActor.class).withDispatcher("defaultDispatcher1").withRouter(new RoundRobinPool(3))
, "echoActor");

for (int i = 0; i < 10; i++) {
actor.tell("消息:" + i, ActorRef.noSender());
}

Thread.sleep(1000);
system.terminate();
}

}

执行输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[INFO] [01/29/2018 18:02:44.600] [dispatcherSystem-defaultDispatcher1-5] [akka://dispatcherSystem/user/echoActor/$a] 实例 #3, Hashcode #1590930020
[INFO] [01/29/2018 18:02:44.600] [dispatcherSystem-defaultDispatcher1-6] [akka://dispatcherSystem/user/echoActor/$b] 实例 #1, Hashcode #938937247
[INFO] [01/29/2018 18:02:44.603] [dispatcherSystem-defaultDispatcher1-7] [akka://dispatcherSystem/user/echoActor/$c] 实例 #2, Hashcode #904652545
[INFO] [01/29/2018 18:02:44.603] [dispatcherSystem-defaultDispatcher1-5] [akka://dispatcherSystem/user/echoActor/$a] [接收消息]:消息:0, Actor:akka://dispatcherSystem/user/echoActor/$a, Thread:dispatcherSystem-defaultDispatcher1-5
[INFO] [01/29/2018 18:02:44.603] [dispatcherSystem-defaultDispatcher1-6] [akka://dispatcherSystem/user/echoActor/$b] [接收消息]:消息:1, Actor:akka://dispatcherSystem/user/echoActor/$b, Thread:dispatcherSystem-defaultDispatcher1-6
[INFO] [01/29/2018 18:02:44.603] [dispatcherSystem-defaultDispatcher1-7] [akka://dispatcherSystem/user/echoActor/$c] [接收消息]:消息:2, Actor:akka://dispatcherSystem/user/echoActor/$c, Thread:dispatcherSystem-defaultDispatcher1-7
[INFO] [01/29/2018 18:02:44.603] [dispatcherSystem-defaultDispatcher1-5] [akka://dispatcherSystem/user/echoActor/$a] [接收消息]:消息:3, Actor:akka://dispatcherSystem/user/echoActor/$a, Thread:dispatcherSystem-defaultDispatcher1-5
[INFO] [01/29/2018 18:02:44.603] [dispatcherSystem-defaultDispatcher1-6] [akka://dispatcherSystem/user/echoActor/$b] [接收消息]:消息:4, Actor:akka://dispatcherSystem/user/echoActor/$b, Thread:dispatcherSystem-defaultDispatcher1-6
[INFO] [01/29/2018 18:02:44.603] [dispatcherSystem-defaultDispatcher1-7] [akka://dispatcherSystem/user/echoActor/$c] [接收消息]:消息:5, Actor:akka://dispatcherSystem/user/echoActor/$c, Thread:dispatcherSystem-defaultDispatcher1-7
[INFO] [01/29/2018 18:02:44.603] [dispatcherSystem-defaultDispatcher1-8] [akka://dispatcherSystem/user/echoActor/$a] [接收消息]:消息:6, Actor:akka://dispatcherSystem/user/echoActor/$a, Thread:dispatcherSystem-defaultDispatcher1-8
[INFO] [01/29/2018 18:02:44.603] [dispatcherSystem-defaultDispatcher1-9] [akka://dispatcherSystem/user/echoActor/$b] [接收消息]:消息:7, Actor:akka://dispatcherSystem/user/echoActor/$b, Thread:dispatcherSystem-defaultDispatcher1-9
[INFO] [01/29/2018 18:02:44.603] [dispatcherSystem-defaultDispatcher1-8] [akka://dispatcherSystem/user/echoActor/$a] [接收消息]:消息:9, Actor:akka://dispatcherSystem/user/echoActor/$a, Thread:dispatcherSystem-defaultDispatcher1-8
[INFO] [01/29/2018 18:02:44.604] [dispatcherSystem-defaultDispatcher1-10] [akka://dispatcherSystem/user/echoActor/$c] [接收消息]:消息:8, Actor:akka://dispatcherSystem/user/echoActor/$c, Thread:dispatcherSystem-defaultDispatcher1-10
`

Akka Remote应用

Akka是一种消息驱动运算模式,它实现跨JVM程序运算的方式是通过能跨JVM的消息系统来调动分布在不同JVM上ActorSystem中的Actor进行运算,前题是Akka的地址系统可以支持跨JVM定位。Akka的消息系统最高境界可以实现所谓的Actor位置透明化,这样在Akka编程中就无须关注Actor具体在哪个JVM上运行,分布式Actor编程从方式上跟普通Actor编程就不会有什么区别了。Akka的Remoting是一种点对点的跨JVM消息通道,让一个JVM上ActorSystem中的某个Actor可以连接另一个JVM上ActorSystem中的另一个Actor。两个JVM上的ActorSystem之间只需具备TCP网络连接功能就可以实现Akka Remoting了。Akka-Remoting还没有实现完全的位置透明化,因为用户还必须在代码里或者配置文件里指明目标Actor的具体地址.

Maven依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.5.2</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId>
<version>2.5.2</version>
</dependency>

服务端消息处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class EchoActor extends AbstractLoggingActor{

private static AtomicInteger instance = new AtomicInteger(0);

@Override
public void preStart() {
log().info("启动实例 #" + instance.incrementAndGet() + ", Hashcode #" + this.hashCode());
}

@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, x->{
log().info("[接收消息]:{}, Actor:{}", x, getSender().path());
})
.build();
}
}

服务端主类示例

主类代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ServerSystem {

public static void main(String[] args) {
ActorSystem system = ActorSystem.create("ServerSys", serverConfig());
ActorRef actor = system.actorOf(Props.create(EchoActor.class), "echoActor");
actor.tell("服务端启动...", ActorRef.noSender());
}

public static Config serverConfig(){
Map<String, Object> map = Maps.newHashMap();
map.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider");
map.put("akka.remote.transport", "akka.remote.netty.NettyRemoteTransport");
map.put("akka.remote.netty.tcp.hostname", "127.0.0.1");
map.put("akka.remote.netty.tcp.port", "2500");
return ConfigFactory.parseMap(map);
}
}

启动日志

1
2
3
4
5
[INFO] [01/29/2018 16:48:17.553] [main] [akka.remote.Remoting] Starting remoting
[INFO] [01/29/2018 16:48:17.708] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ServerSys@127.0.0.1:2500]
[INFO] [01/29/2018 16:48:17.709] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://ServerSys@127.0.0.1:2500]
[INFO] [01/29/2018 16:48:17.734] [ServerSys-akka.actor.default-dispatcher-2] [akka.tcp://ServerSys@127.0.0.1:2500/user/echoActor] 启动实例 #1, Hashcode #242972067
[INFO] [01/29/2018 16:48:17.734] [ServerSys-akka.actor.default-dispatcher-2] [akka.tcp://ServerSys@127.0.0.1:2500/user/echoActor] [接收消息]:服务端启动..., Actor:akka://ServerSys/user/echoActor

客户端主类示例

主类代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ClientSystem {

public static void main(String[] args) {
final ActorSystem system = ActorSystem.create("ClientSys", clientConfig());
String path = "akka.tcp://ServerSys@127.0.0.1:2500/user/echoActor";
ActorRef remoteActor = system.actorFor(path);
for(int i=0; i<5; i++){
remoteActor.tell("客户端推送发送" + i, ActorRef.noSender());
}
}

public static Config clientConfig(){
Map<String, Object> map = Maps.newHashMap();
map.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider");
map.put("akka.remote.transport", "akka.remote.netty.NettyRemoteTransport");
map.put("akka.remote.netty.tcp.hostname", "127.0.0.1");
map.put("akka.remote.netty.tcp.port", "2600");
return ConfigFactory.parseMap(map);
}
}

启动日志

1
2
3
[INFO] [01/29/2018 16:48:26.999] [main] [akka.remote.Remoting] Starting remoting
[INFO] [01/29/2018 16:48:27.168] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClientSys@127.0.0.1:2600]
[INFO] [01/29/2018 16:48:27.169] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://ClientSys@127.0.0.1:2600]

服务端日志

1
2
3
4
5
6
7
8
9
10
[INFO] [01/29/2018 16:48:17.553] [main] [akka.remote.Remoting] Starting remoting
[INFO] [01/29/2018 16:48:17.708] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ServerSys@127.0.0.1:2500]
[INFO] [01/29/2018 16:48:17.709] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://ServerSys@127.0.0.1:2500]
[INFO] [01/29/2018 16:48:17.734] [ServerSys-akka.actor.default-dispatcher-2] [akka.tcp://ServerSys@127.0.0.1:2500/user/echoActor] 启动实例 #1, Hashcode #242972067
[INFO] [01/29/2018 16:48:17.734] [ServerSys-akka.actor.default-dispatcher-2] [akka.tcp://ServerSys@127.0.0.1:2500/user/echoActor] [接收消息]:服务端启动..., Actor:akka://ServerSys/user/echoActor
[INFO] [01/29/2018 16:48:27.326] [ServerSys-akka.actor.default-dispatcher-4] [akka.tcp://ServerSys@127.0.0.1:2500/user/echoActor] [接收消息]:客户端推送发送0, Actor:akka://ServerSys/user/echoActor
[INFO] [01/29/2018 16:48:27.326] [ServerSys-akka.actor.default-dispatcher-4] [akka.tcp://ServerSys@127.0.0.1:2500/user/echoActor] [接收消息]:客户端推送发送1, Actor:akka://ServerSys/user/echoActor
[INFO] [01/29/2018 16:48:27.326] [ServerSys-akka.actor.default-dispatcher-4] [akka.tcp://ServerSys@127.0.0.1:2500/user/echoActor] [接收消息]:客户端推送发送2, Actor:akka://ServerSys/user/echoActor
[INFO] [01/29/2018 16:48:27.326] [ServerSys-akka.actor.default-dispatcher-4] [akka.tcp://ServerSys@127.0.0.1:2500/user/echoActor] [接收消息]:客户端推送发送3, Actor:akka://ServerSys/user/echoActor
[INFO] [01/29/2018 16:48:27.326] [ServerSys-akka.actor.default-dispatcher-4] [akka.tcp://ServerSys@127.0.0.1:2500/user/echoActor] [接收消息]:客户端推送发送4, Actor:akka://ServerSys/user/echoActor

相关文档