Akka之Actor
简介
Akka是使用Scala语言开发的,因为Scala也是运行在JVM之上的语言,所以我们也可以在Java中使用Akka。
Akka是使用Actor模型,其粒度比线程更小,但是却更容易去开发并发的程序。除此之外,Akka还提供了一套容错机制,允许在Actor出现异常的时候进行一些回复或者重置操作,akka除了能在单机上构建高并发程序之外,还能在网络中构建分布式程序,并提供位置透明的Actor定位服务。
Actor模型
在Akka中,消息在actor之间进行传递和处理,以此驱动任务的执行,不同于常见的OOP需要调用某个对象的方法才能做某事。
Actor的方式更像是问答的:
比如老师问同学1+1等于?然后同学听到了,回答说是2. actor之间的通信方式就如同这样。
Actor也拥有线程安全和轻量级的特点:
- 线程安全: actor运行于线程池之上,单个actor总是线程安全的,其内部的邮箱保证只有一个消息处理完之后,才会发送下一个消息。并且本身在处理接收到的消息时是串行的
- 轻量级: 大型应用中,可能同时运行着成千上万个Actor,在Akka中,每个Actor只占用300字节左右。即使单机内存不够用了,也可以方便的切换成分布式模式。
Actor的层级
Akka中的actor一直都会属于某个Parent,一般我们使用如下方式创建Actor:
getContext().actorOf()
这种方式会在已经存在的actor下面创建一个子actor,当前的actor就是新创建actor的父亲。
那么谁是第一个actor呢?
一般所有的actor都有一个共同的父亲,新的actor也可以通过如下方式创建新的actor实例
system.actorOf()
如果我们创建了一个名叫:someActor
的actor,那么它的引用路径就是:/user/someActor
实际上在我们创建actor之前,akka就已经默认的创建了3个actors,他们用于监管接下来新创建的子actor。
- Root guardian:整个ActorSystem的根,它是所有actor的父亲,并且在系统终止的时候,它也是最后一个被停止的。
- /user 这个是我们最常见到的,所有的通过Actorsystem.actorOf()方法创建的Actor都属于该分支下,这个是我们能手动创建的最高级别Actor。 其他通过ActorContext.actorOf() 方法创建的Actor都是其子级。
- /system 系统层面创建的,主要与系统的整体行为有关,在开发阶段不需要对其有过多的关注。
另外需要注意的是尽量保证每个应用程序内部只需要一个ActorSystem对象
Actor的层级关系Demo:
public class ActorHierarchyDemo {
public static void main(String[] args) throws java.io.IOException {
ActorSystem system = ActorSystem.create("macaoyuan");
ActorRef firstRef = system.actorOf(PrintMyActorRefActor.props(), "first-actor");
System.out.println("First: " + firstRef);
firstRef.tell("print", ActorRef.noSender());
System.out.println(">>> Press ENTER to exit <<<");
try {
System.in.read();
} finally {
system.terminate();
}
}
static class PrintMyActorRefActor extends AbstractActor {
static Props props() {
return Props.create(PrintMyActorRefActor.class, PrintMyActorRefActor::new);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals(
"print",
p -> {
ActorRef secondRef = getContext().actorOf(Props.empty(), "second-actor");
System.out.println("Second: " + secondRef);
})
.build();
}
}
}
运行结果:
First: Actor[akka://macaoyuan/user/first-actor#99771086]
>>> Press ENTER to exit <<<
Second: Actor[akka://macaoyuan/user/first-actor/second-actor#-811498937]
- akka是协议的前缀,两个路径都是akka的协议
- testSystem是我们创建的ActorSystem名称,我们也可以去指定其他的名称
- 因为second-actor是通过first-actor创建的,所以在路径上是它的孩子
- 最后的那些数字标识符不重要,一般我们可以忽略掉
Actor的生命周期
每个actor都会经历“生老病死”的阶段, 在特定的阶段,我们要做一些正确的事情。Actor暴露了一些生命周期管理的API,但是最常用的是preStart与postStop
- preStart():在actor启动但是还没有处理第一条消息之前运行
- postStop():在actor停止之前,这个时候已经没有任何的消息可以处理了
想要去停止一个actor,最好在actor内部去使用
getContext().stop(getSelf())
而不要在另外一个actor去停止其他的actor:getContext().stop(actorRef),这样的做法可能会导致一定的风险,一般我们是给要停止的actor发送一个停止消息。
Demo:
public class ActorLifecycleDemo {
static class StartStopActor1 extends AbstractActor {
static Props props() {
return Props.create(StartStopActor1.class, StartStopActor1::new);
}
@Override
public void preStart() {
System.out.println("first started");
getContext().actorOf(StartStopActor2.props(), "second");
}
@Override
public void postStop() {
System.out.println("first stopped");
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals(
"stop",
s -> {
getContext().stop(getSelf());
})
.build();
}
}
static class StartStopActor2 extends AbstractActor {
static Props props() {
return Props.create(StartStopActor2.class, StartStopActor2::new);
}
@Override
public void preStart() {
System.out.println("second started");
}
@Override
public void postStop() {
System.out.println("second stopped");
}
@Override
public Receive createReceive() {
return receiveBuilder().build();
}
}
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("macaoyuan");
ActorRef first = system.actorOf(StartStopActor1.props(), "first");
first.tell("stop", ActorRef.noSender());
}
}
运行结果:
first started
second started
second stopped
first stopped
当我们想要停止first actor的时候,它会首先去停止它的子actor。
错误处理
父子actor通过它们的生命周期连接在一起,当某个actor出现错误的时候,它会被临时的挂起,当失败传递到其父亲actor的时候,父亲actor会判断要怎么处理这个错误信息。从这个角度来看得话,父亲actor是作为子actor的监管,默认的监管策略是停止然后重启子Actor,当然我们也可以自定义自己的监管策略。
默认的监管策略:
public class SupervisorDemo {
// 监管的
static class SupervisingActor extends AbstractActor {
static Props props() {
return Props.create(SupervisingActor.class, SupervisingActor::new);
}
ActorRef child = getContext().actorOf(SupervisedActor.props(), "supervised-actor");
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals(
"failChild",
f -> {
child.tell("fail", getSelf());
})
.build();
}
}
// 被监管的
static class SupervisedActor extends AbstractActor {
static Props props() {
return Props.create(SupervisedActor.class, SupervisedActor::new);
}
@Override
public void preStart() {
System.out.println("supervised actor started");
}
@Override
public void postStop() {
System.out.println("supervised actor stopped");
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals(
"fail",
f -> {
System.out.println("supervised actor fails now");
throw new Exception("I failed!");
})
.build();
}
}
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("macaoyuan");
ActorRef supervisingActor = system.actorOf(SupervisingActor.props(), "supervising-actor");
supervisingActor.tell("failChild", ActorRef.noSender());
}
}
运行结果:
/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/bin/java -javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=54806:/Applications/IntelliJ IDEA.app/Contents/bin -Dfile.encoding=UTF-8 -classpath /Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/rt.jar:/Users/mcy/IdeaProjects/Test/target/test-classes:/Users/mcy/IdeaProjects/Test/target/classes:/Users/mcy/Documents/devTools/mavenRepository/javax/enterprise/cdi-api/2.0.SP1/cdi-api-2.0.SP1.jar:/Users/mcy/Documents/devTools/mavenRepository/javax/el/javax.el-api/3.0.0/javax.el-api-3.0.0.jar:/Users/mcy/Documents/devTools/mavenRepository/javax/interceptor/javax.interceptor-api/1.2/javax.interceptor-api-1.2.jar:/Users/mcy/Documents/devTools/mavenRepository/javax/inject/javax.inject/1/javax.inject-1.jar:/Users/mcy/Documents/devTools/mavenRepository/javax/ws/rs/javax.ws.rs-api/2.1.1/javax.ws.rs-api-2.1.1.jar:/Users/mcy/Documents/devTools/mavenRepository/javax/servlet/javax.servlet-api/4.0.1/javax.servlet-api-4.0.1.jar:/Users/mcy/Documents/devTools/mavenRepository/org/junit/jupiter/junit-jupiter-api/5.9.2/junit-jupiter-api-5.9.2.jar:/Users/mcy/Documents/devTools/mavenRepository/org/opentest4j/opentest4j/1.2.0/opentest4j-1.2.0.jar:/Users/mcy/Documents/devTools/mavenRepository/org/junit/platform/junit-platform-commons/1.9.2/junit-platform-commons-1.9.2.jar:/Users/mcy/Documents/devTools/mavenRepository/org/apiguardian/apiguardian-api/1.1.2/apiguardian-api-1.1.2.jar:/Users/mcy/Documents/devTools/mavenRepository/org/junit/jupiter/junit-jupiter-engine/5.9.2/junit-jupiter-engine-5.9.2.jar:/Users/mcy/Documents/devTools/mavenRepository/org/junit/platform/junit-platform-engine/1.9.2/junit-platform-engine-1.9.2.jar:/Users/mcy/Documents/devTools/mavenRepository/com/typesafe/akka/akka-cluster_2.11/2.5.9/akka-cluster_2.11-2.5.9.jar:/Users/mcy/Documents/devTools/mavenRepository/org/scala-lang/scala-library/2.11.12/scala-library-2.11.12.jar:/Users/mcy/Documents/devTools/mavenRepository/com/typesafe/akka/akka-remote_2.11/2.5.9/akka-remote_2.11-2.5.9.jar:/Users/mcy/Documents/devTools/mavenRepository/com/typesafe/akka/akka-actor_2.11/2.5.9/akka-actor_2.11-2.5.9.jar:/Users/mcy/Documents/devTools/mavenRepository/com/typesafe/config/1.3.2/config-1.3.2.jar:/Users/mcy/Documents/devTools/mavenRepository/org/scala-lang/modules/scala-java8-compat_2.11/0.7.0/scala-java8-compat_2.11-0.7.0.jar:/Users/mcy/Documents/devTools/mavenRepository/com/typesafe/akka/akka-stream_2.11/2.5.9/akka-stream_2.11-2.5.9.jar:/Users/mcy/Documents/devTools/mavenRepository/org/reactivestreams/reactive-streams/1.0.2/reactive-streams-1.0.2.jar:/Users/mcy/Documents/devTools/mavenRepository/com/typesafe/ssl-config-core_2.11/0.2.2/ssl-config-core_2.11-0.2.2.jar:/Users/mcy/Documents/devTools/mavenRepository/org/scala-lang/modules/scala-parser-combinators_2.11/1.0.4/scala-parser-combinators_2.11-1.0.4.jar:/Users/mcy/Documents/devTools/mavenRepository/com/typesafe/akka/akka-protobuf_2.11/2.5.9/akka-protobuf_2.11-2.5.9.jar:/Users/mcy/Documents/devTools/mavenRepository/io/netty/netty/3.10.6.Final/netty-3.10.6.Final.jar:/Users/mcy/Documents/devTools/mavenRepository/io/aeron/aeron-driver/1.7.0/aeron-driver-1.7.0.jar:/Users/mcy/Documents/devTools/mavenRepository/io/aeron/aeron-client/1.7.0/aeron-client-1.7.0.jar:/Users/mcy/Documents/devTools/mavenRepository/org/agrona/agrona/0.9.12/agrona-0.9.12.jar SupervisorDemo
supervised actor started
supervised actor fails now
supervised actor stopped
supervised actor started
[ERROR] [08/23/2023 15:56:01.417] [macaoyuan-akka.actor.default-dispatcher-2] [akka://macaoyuan/user/supervising-actor/supervised-actor] I failed!
java.lang.Exception: I failed!
at SupervisorDemo$SupervisedActor.lambda$createReceive$0(SupervisorDemo.java:51)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:130)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
at akka.actor.ActorCell.invoke(ActorCell.scala:496)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
虽然supervised失败抛出异常,但是程序又重新的启动了起来了