Akka集群
Akka集群由多个物理节点组成,节点分布在不同的JVM之上,每个节点用hostname:port:uid来唯一标识。集群的节点数量并不固定,可以动态的加入或移出节点。集群直接的状态通过gossip协议来同步。
默认情况下,集群启动时seed-node会自动加入该集群。
创建Akka集群
- 添加maven依赖
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster_2.11</artifactId>
<version>2.5.9</version>
</dependency>
- 编写配置文件,虽然启动多个节点,但是可以共用配置文件,需要变动的内容,通过传参动态设置。
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
# 当前节点的信息,如果要启动多个节点,修改这部分内部即可
netty.tcp {
hostname = "127.0.0.1"
port = 2550
}
}
cluster {
seed-nodes = [
"akka.tcp://mysys@127.0.0.1:2551",
"akka.tcp://mysys@127.0.0.1:2552"
]
}
}
- 订阅集群相关事件,集群的状态发生变化的时候可以感知到
@Slf4j
public class SimpleClusterListener extends UntypedActor {
Cluster cluster = Cluster.get(getContext().system());
// subscribe to cluster changes
@Override
public void preStart() {
// #subscribe
cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), ClusterEvent.MemberEvent.class, ClusterEvent.UnreachableMember.class);
// #subscribe
}
// re-subscribe when restart
@Override
public void postStop() {
cluster.unsubscribe(getSelf());
}
@Override
public void onReceive(Object message) {
if (message instanceof ClusterEvent.MemberUp) {
ClusterEvent.MemberUp mUp = (ClusterEvent.MemberUp) message;
log.info("Member is Up: {}", mUp.member());
} else if (message instanceof ClusterEvent.UnreachableMember) {
ClusterEvent.UnreachableMember mUnreachable = (ClusterEvent.UnreachableMember) message;
log.info("Member detected as unreachable: {}", mUnreachable.member());
} else if (message instanceof ClusterEvent.MemberRemoved) {
ClusterEvent.MemberRemoved mRemoved = (ClusterEvent.MemberRemoved) message;
log.info("Member is Removed: {}", mRemoved.member());
} else if (message instanceof ClusterEvent.MemberEvent) {
// ignore
} else {
unhandled(message);
}
}
}
- 集群启动的主函数
public class ClusterDemo {
public static void main(String[] args) {
String port = args[0];
Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port)
.withFallback(ConfigFactory.load());
// 配置文件中为mysys
ActorSystem system = ActorSystem.create("mysys", config);
system.actorOf(Props.create(SimpleClusterListener.class),"clusterListner" + port);
}
}
Akka集群统计单词Demo
- 首先编写后端代码:
- 订阅集群事件,如果有新的节点启动,判断其如果是前端节点,那么发送ok消息给前端节点
- 收到Article消息后,处理Article消息
@Slf4j
public class WordCountService extends AbstractActor {
Cluster cluster = Cluster.get(getContext().system());
@Override
public void preStart() throws Exception {
super.preStart();
cluster.subscribe(getSelf(), ClusterEvent.MemberUp.class);
}
@Override
public void postStop() throws Exception {
super.postStop();
cluster.unsubscribe(getSelf());
}
@Override
public Receive createReceive() {
return ReceiveBuilder.create().match(FrontEndService.Article.class, this::processArticle)
.match(ClusterEvent.MemberUp.class, this::processMemberUp)
.build();
}
private void processMemberUp(ClusterEvent.MemberUp memberUp) {
Member member = memberUp.member();
log.info("Member角色:{},地址:{}", member.roles(), member.address());
if (member.hasRole("wordFE")) {
getContext().actorSelection(member.address() + "/user/wordFrontService")
.tell("ok", getSelf());
}
log.info("{} is up", member);
}
private void processArticle(FrontEndService.Article article) {
log.info("当前节点 {}, {} 正在处理", cluster.selfAddress(), getSelf());
String content = article.getContent();
int wordCount = content.split("").length;
FrontEndService.CountResult result = new FrontEndService.CountResult();
result.setCount(wordCount);
result.setId(article.getId());
getSender().tell(result, getSelf());
}
public static void main(String[] args) {
Config config = ConfigFactory
.parseString("akka.remote.netty.tcp.port=" + 2551)
.withFallback(ConfigFactory.load());
ActorSystem system = ActorSystem.create("mysys", config);
ActorRef backEndRef = system.actorOf(Props.create(WordCountService.class), "backEnd1");
}
}
- 编写前端Actor
- 如果收到Article消息,将其转发给后端处理
- 收到String消息,判断消息类型,如果是后端的ok消息,那么加入后端节点,其他询问消息正常返回
@Slf4j
public class FrontEndService extends AbstractActor {
private final List<ActorRef> wordCountService = new LinkedList<>();
private int jobCount = 0;
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(Article.class, this::processArticle)
.match(String.class, this::processString)
.match(Terminated.class, this::removeNode)
.build();
}
private void removeNode(Terminated sig) {
log.info("{} 被移除", getSender());
this.wordCountService.remove(getSender());
}
private void processString(String msg) {
if (msg.equals("ok")) {
ActorRef sender = getSender();
log.info("{} 可用", sender);
wordCountService.add(sender);
getContext().watch(sender);
} else if (msg.equals("isOK")) {
if (wordCountService.isEmpty()) {
getSender().tell("notOK", getSelf());
} else {
getSender().tell("ImOK", getSelf());
}
}
}
private void processArticle(Article article) {
jobCount++;
int backNodeIndex = jobCount % wordCountService.size();
log.info("选择节点:{}", backNodeIndex);
wordCountService.get(backNodeIndex).forward(article, getContext());
}
@Data
public static class Article implements Serializable {
String id;
String content;
}
@Data
public static class CountResult implements Serializable {
String id;
int count;
}
public static void main(String[] args) {
Config config = ConfigFactory
.parseString("akka.remote.netty.tcp.port=" + 2553)
.withFallback(
ConfigFactory.parseString("akka.cluster.roles=[wordFE]"))
.withFallback(ConfigFactory.load());
ActorSystem system = ActorSystem.create("mysys", config);
ActorRef frontRef = system.actorOf(Props.create(FrontEndService.class), "wordFrontService");
log.info("前端服务路径:{}", frontRef.path());
String result = "";
for (; ; ) {
Future<Object> future = Patterns.ask(frontRef, "isOK", 1000);
try {
result = (String) Await.result(future, Duration.create(1000, "seconds"));
if (result.equals("ImOK")) {
log.info("系统已经OK了");
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
List<Article> arts = new LinkedList<>();
File dir = new File("/User/macaoyuan/Download");
File[] files = dir.listFiles();
assert files != null;
for (File file : files) {
Article article = new Article();
article.setId(file.getName());
// 为了简化
article.setContent(file.getAbsolutePath());
arts.add(article);
}
Timeout timeout = new Timeout(Duration.create(3, TimeUnit.SECONDS));
for (Article art : arts) {
Patterns.ask(frontRef, art, timeout).onSuccess(new OnSuccess<Object>() {
@Override
public void onSuccess(Object result) throws Throwable {
CountResult countResult = (CountResult) result;
log.info("统计结果:Id{} 数量{}", countResult.getId(), countResult.getCount());
}
}, system.dispatcher());
}
}
}
Akka Remote
官方推荐最好使用Akka Cluster,而不是直接使用Akka Remote,因为Remote模块主要就是用于给Cluster使用的底层模块,但是对于了解Cluster的细节还是有很大帮助的。
Akka有两种类型的远程方式:
- 查询:在远端节点上查询是否有对应的actor,actorSelection(path)
- 创建:在远端节点上创建actor,actorOf(Props(…), actorName)
Akka Remote的使用
- 添加依赖
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.12</artifactId>
<version>2.5.23</version>
</dependency>
- 配置文件中,开启remote能力,至少要有如下内容
akka {
actor {
# 本地运行的provider为local,远程的即为remote
provider = remote
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
# actor运行的主机,用于标记唯一的系统方便通信,端口设置为0的时候,它会自动选择端口
netty.tcp {
hostname = "127.0.0.1"
port = 2552
}
}
}
- 查询:
ActorSelection selection =
context.actorSelection("akka.tcp://app@10.0.0.1:2552/user/serviceA/worker");
selection.tell("Pretty awesome feature", getSelf());
- 创建远端Actor,在配置文件中做一些修改:
当system.actorOf(new Props(…), “sampleActor”)创建sampleActor的时候,actor不会直接的被创建而是到远端的节点上创建,配置文件比较灵活也可以添加白名单,编程方式等处理
akka {
actor {
deployment {
/sampleActor {
remote = "akka.tcp://sampleActorSystem@127.0.0.1:2553"
}
}
}
}
其余的使用方式与普通的actor没有差别