Akka 集群路由
集群中的路由可以意识到集群中成员的状态,添加新的routees或者在集群的节点上查找routees,当节点离开集群或者不可达的时候,离开节点的routees自动的从router中移除,当新的节点加入的时候,routees会自动的加入route中。
解释下Akka中的两个概念,Router和Routee
Router:表示路由器,相当于消息的中转站,既可以是一个Router对象,也可以是一个Actor引用
Routee:表示路由目标,从路由器转发出来的消息会进入路由目标
Akka有两种类型的路由器:
- Group - router that sends messages to the specified path using actor selection,路由器通过actor selection发送消息到指定的路径,一种用法是集群中的后端服务运行一些服务,前端节点通过路由器(Router)来查找后端服务
- Pool - router that creates routees as child actors and deploys them on remote nodes,路由器创建子Actor,然后部署Actor到远端的节点上,每一个路由器都有自己的Routee实例,例如:
- 在一个10节点的集群中启动了3个Router,你会有30个Routee。不同的Router创建的Routees不能被共享
- 主要用在当运行任务的时候,分发实际的计算给其它节点,一般临时生成routee actor
Demo For Java
添加依赖:
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster_2.12</artifactId>
<version>2.5.23</version>
</dependency>
Group Router
当使用Group,必须启动routee actor在集群中节点上,Routee actor一般在Router actor启动之前创建好。配置如下:
akka.actor.deployment {
/statsService/workerRouter {
router = consistent-hashing-group
routees.paths = ["/user/statsWorker"]
cluster {
enabled = on
allow-local-routees = on
use-roles = ["compute"]
}
}
}
- routees.paths 用于选择actor,router要转发消息的路径,路径中不要包含协议和地址信息
- max-total-nr-of-instances 可以定义集群中总的Routees,默认值为10000,尽量将值设置的低一点
- use-roles 限制在集群中的什么角色上查找routees
Demo提供了一个服务用于做文本统计,将文本分隔,然后把单词数到后端的worker actor上。计数结果再返回给aggregator计算每个单词的平均字符数。
定义实体类:
public interface StatsMessages {
@Getter
class StatsJob implements Serializable {
private final String text;
public StatsJob(String text) {
this.text = text;
}
}
@Getter
class StatsResult implements Serializable {
private final double meanWordLength;
public StatsResult(double meanWordLength) {
this.meanWordLength = meanWordLength;
}
@Override
public String toString() {
return "meanWordLength: " + meanWordLength;
}
}
@Getter
class JobFailed implements Serializable {
private final String reason;
public JobFailed(String reason) {
this.reason = reason;
}
@Override
public String toString() {
return "JobFailed(" + reason + ")";
}
}
}
统计每个单词的字符数:
public class StatsWorker extends AbstractActor {
Map<String, Integer> cache = new HashMap<String, Integer>();
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
String.class,
word -> {
Integer length = cache.computeIfAbsent(word, String::length);
getSender().tell(length, getSelf());
})
.build();
}
}
后端服务收到文本,然后分隔为不同的单词,分发到worker节点和aggregates上
public class StatsService extends AbstractActor {
ActorRef workerRouter =
getContext()
.actorOf(FromConfig.getInstance().props(Props.create(StatsWorker.class)), "workerRouter");
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
StatsMessages.StatsJob.class,
job -> !job.getText().isEmpty(),
job -> {
String[] words = job.getText().split(" ");
ActorRef replyTo = getSender();
// worker节点回复的目标actor
ActorRef aggregator =
getContext().actorOf(Props.create(StatsAggregator.class, words.length, replyTo));
// send each word to a worker
for (String word : words) {
workerRouter.tell(new ConsistentHashingRouter.ConsistentHashableEnvelope(word, word), aggregator);
}
})
.build();
}
}
public class StatsAggregator extends AbstractActor {
final int expectedResults;
final ActorRef replyTo;
final List<Integer> results = new ArrayList<Integer>();
public StatsAggregator(int expectedResults, ActorRef replyTo) {
this.expectedResults = expectedResults;
this.replyTo = replyTo;
}
@Override
public void preStart() {
getContext().setReceiveTimeout(Duration.ofSeconds(3));
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
Integer.class,
wordCount -> {
results.add(wordCount);
if (results.size() == expectedResults) {
int sum = 0;
for (int c : results) {
sum += c;
}
double meanWordLength = ((double) sum) / results.size();
replyTo.tell(new StatsResult(meanWordLength), getSelf());
getContext().stop(getSelf());
}
})
.match(
ReceiveTimeout.class,
x -> {
replyTo.tell(new JobFailed("Service unavailable, try again later"), getSelf());
getContext().stop(getSelf());
})
.build();
}
}
以上程序,当用户的请求从集群中的某个节点发送给StatsService的时候,它会利用后端所有节点的StatsWorker Actor。
先启动worker节点,然后再启动service节点
system.actorOf(Props.create(StatsWorker.class), "statsWorker");
system.actorOf(Props.create(StatsService.class), "statsService");
Pool Router
pool的router配置文件看起来如下:
akka.actor.deployment {
/statsService/singleton/workerRouter {
router = consistent-hashing-pool
cluster {
enabled = on
max-nr-of-instances-per-node = 3
allow-local-routees = on
use-roles = ["compute"]
}
}
}
use-roles routees必须要在集群中指定’compute’角色的节点启动部署
创建一个statService的单例actor,在集群中的每个节点上,创建单例的代理节点,代理节点跟踪当前Master节点位置,代理任务到statServie节点
ClusterSingletonManagerSettings settings = ClusterSingletonManagerSettings.create(system)
.withRole("compute");
system.actorOf(ClusterSingletonManager.props(
Props.create(StatsService.class), PoisonPill.getInstance(), settings),
"statsService");
ClusterSingletonProxySettings proxySettings =
ClusterSingletonProxySettings.create(system).withRole("compute");
system.actorOf(ClusterSingletonProxy.props("/user/statsService",
proxySettings), "statsServiceProxy");
其余的代码同Group Router。
小结
Group Router需要提前创建好Actor
Pool Router 在路由的时候自己创建子Actor并将消息转发过去
在真正要进行路由的Actor中创建Router:
public class StatsService extends AbstractActor {
ActorRef workerRouter = getContext().actorOf(
FromConfig.getInstance().props(Props.create(StatsWorker.class)),
"workerRouter");
...
}