Akka 集群路由

Akka 集群路由

马草原 308 2021-08-19

Akka 集群路由

集群中的路由可以意识到集群中成员的状态,添加新的routees或者在集群的节点上查找routees,当节点离开集群或者不可达的时候,离开节点的routees自动的从router中移除,当新的节点加入的时候,routees会自动的加入route中。

解释下Akka中的两个概念,Router和Routee

Router:表示路由器,相当于消息的中转站,既可以是一个Router对象,也可以是一个Actor引用

Routee:表示路由目标,从路由器转发出来的消息会进入路由目标

akka

Akka有两种类型的路由器:

  • Group - router that sends messages to the specified path using actor selection,路由器通过actor selection发送消息到指定的路径,一种用法是集群中的后端服务运行一些服务,前端节点通过路由器(Router)来查找后端服务
  • Pool - router that creates routees as child actors and deploys them on remote nodes,路由器创建子Actor,然后部署Actor到远端的节点上,每一个路由器都有自己的Routee实例,例如:
    • 在一个10节点的集群中启动了3个Router,你会有30个Routee。不同的Router创建的Routees不能被共享
    • 主要用在当运行任务的时候,分发实际的计算给其它节点,一般临时生成routee actor

Demo For Java

添加依赖:

<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-cluster_2.12</artifactId>
  <version>2.5.23</version>
</dependency>

Group Router

当使用Group,必须启动routee actor在集群中节点上,Routee actor一般在Router actor启动之前创建好。配置如下:

akka.actor.deployment {
  /statsService/workerRouter {
      router = consistent-hashing-group
      routees.paths = ["/user/statsWorker"]
      cluster {
        enabled = on
        allow-local-routees = on
        use-roles = ["compute"]
      }
    }
}
  • routees.paths 用于选择actor,router要转发消息的路径,路径中不要包含协议和地址信息
  • max-total-nr-of-instances 可以定义集群中总的Routees,默认值为10000,尽量将值设置的低一点
  • use-roles 限制在集群中的什么角色上查找routees

Demo提供了一个服务用于做文本统计,将文本分隔,然后把单词数到后端的worker actor上。计数结果再返回给aggregator计算每个单词的平均字符数。

定义实体类:

public interface StatsMessages {

    @Getter
    class StatsJob implements Serializable {
        private final String text;

        public StatsJob(String text) {
            this.text = text;
        }

    }

    @Getter
    class StatsResult implements Serializable {
        private final double meanWordLength;

        public StatsResult(double meanWordLength) {
            this.meanWordLength = meanWordLength;
        }

        @Override
        public String toString() {
            return "meanWordLength: " + meanWordLength;
        }
    }

    @Getter
    class JobFailed implements Serializable {
        private final String reason;

        public JobFailed(String reason) {
            this.reason = reason;
        }

        @Override
        public String toString() {
            return "JobFailed(" + reason + ")";
        }
    }
}

统计每个单词的字符数:

public class StatsWorker extends AbstractActor {
    Map<String, Integer> cache = new HashMap<String, Integer>();

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(
                        String.class,
                        word -> {
                            Integer length = cache.computeIfAbsent(word, String::length);
                            getSender().tell(length, getSelf());
                        })
                .build();
    }
}

后端服务收到文本,然后分隔为不同的单词,分发到worker节点和aggregates上

public class StatsService extends AbstractActor {

    ActorRef workerRouter =
            getContext()
                    .actorOf(FromConfig.getInstance().props(Props.create(StatsWorker.class)), "workerRouter");

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(
                        StatsMessages.StatsJob.class,
                        job -> !job.getText().isEmpty(),
                        job -> {
                            String[] words = job.getText().split(" ");
                            ActorRef replyTo = getSender();

                            // worker节点回复的目标actor
                            ActorRef aggregator =
                                    getContext().actorOf(Props.create(StatsAggregator.class, words.length, replyTo));

                            // send each word to a worker
                            for (String word : words) {
                                workerRouter.tell(new ConsistentHashingRouter.ConsistentHashableEnvelope(word, word), aggregator);
                            }
                        })
                .build();
    }
}
public class StatsAggregator extends AbstractActor {

    final int expectedResults;
    final ActorRef replyTo;
    final List<Integer> results = new ArrayList<Integer>();

    public StatsAggregator(int expectedResults, ActorRef replyTo) {
        this.expectedResults = expectedResults;
        this.replyTo = replyTo;
    }

    @Override
    public void preStart() {
        getContext().setReceiveTimeout(Duration.ofSeconds(3));
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(
                        Integer.class,
                        wordCount -> {
                            results.add(wordCount);
                            if (results.size() == expectedResults) {
                                int sum = 0;
                                for (int c : results) {
                                    sum += c;
                                }
                                double meanWordLength = ((double) sum) / results.size();
                                replyTo.tell(new StatsResult(meanWordLength), getSelf());
                                getContext().stop(getSelf());
                            }
                        })
                .match(
                        ReceiveTimeout.class,
                        x -> {
                            replyTo.tell(new JobFailed("Service unavailable, try again later"), getSelf());
                            getContext().stop(getSelf());
                        })
                .build();
    }
}

以上程序,当用户的请求从集群中的某个节点发送给StatsService的时候,它会利用后端所有节点的StatsWorker Actor。

先启动worker节点,然后再启动service节点

system.actorOf(Props.create(StatsWorker.class), "statsWorker");
system.actorOf(Props.create(StatsService.class), "statsService");

Pool Router

pool的router配置文件看起来如下:

akka.actor.deployment {
  /statsService/singleton/workerRouter {
      router = consistent-hashing-pool
      cluster {
        enabled = on
        max-nr-of-instances-per-node = 3
        allow-local-routees = on
        use-roles = ["compute"]
      }
    }
}

use-roles routees必须要在集群中指定’compute’角色的节点启动部署

创建一个statService的单例actor,在集群中的每个节点上,创建单例的代理节点,代理节点跟踪当前Master节点位置,代理任务到statServie节点

      ClusterSingletonManagerSettings settings = ClusterSingletonManagerSettings.create(system)
          .withRole("compute");
      system.actorOf(ClusterSingletonManager.props(
          Props.create(StatsService.class), PoisonPill.getInstance(), settings),
          "statsService");

      ClusterSingletonProxySettings proxySettings =
          ClusterSingletonProxySettings.create(system).withRole("compute");
      system.actorOf(ClusterSingletonProxy.props("/user/statsService",
          proxySettings), "statsServiceProxy");

其余的代码同Group Router。

小结

Group Router需要提前创建好Actor
Pool Router 在路由的时候自己创建子Actor并将消息转发过去

在真正要进行路由的Actor中创建Router:

public class StatsService extends AbstractActor {
  ActorRef workerRouter = getContext().actorOf(
      FromConfig.getInstance().props(Props.create(StatsWorker.class)),
      "workerRouter");
  ...  
}