Akka集群

Akka集群

马草原 412 2021-08-18

Akka集群

Akka集群由多个物理节点组成,节点分布在不同的JVM之上,每个节点用hostname:port:uid来唯一标识。集群的节点数量并不固定,可以动态的加入或移出节点。集群直接的状态通过gossip协议来同步。

默认情况下,集群启动时seed-node会自动加入该集群。

创建Akka集群

  1. 添加maven依赖
       <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-cluster_2.11</artifactId>
            <version>2.5.9</version>
        </dependency>
  1. 编写配置文件,虽然启动多个节点,但是可以共用配置文件,需要变动的内容,通过传参动态设置。
akka {
    actor {
        provider = "akka.cluster.ClusterActorRefProvider"
    }

    remote {
        # 当前节点的信息,如果要启动多个节点,修改这部分内部即可
        netty.tcp {
            hostname = "127.0.0.1"
            port = 2550
        }
    }

    cluster {
        seed-nodes = [
              "akka.tcp://mysys@127.0.0.1:2551",
              "akka.tcp://mysys@127.0.0.1:2552"
         ]
    }
}
  1. 订阅集群相关事件,集群的状态发生变化的时候可以感知到
@Slf4j
public class SimpleClusterListener extends UntypedActor {
    Cluster cluster = Cluster.get(getContext().system());

    // subscribe to cluster changes
    @Override
    public void preStart() {
        // #subscribe
        cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), ClusterEvent.MemberEvent.class, ClusterEvent.UnreachableMember.class);
        // #subscribe
    }

    // re-subscribe when restart
    @Override
    public void postStop() {
        cluster.unsubscribe(getSelf());
    }

    @Override
    public void onReceive(Object message) {
        if (message instanceof ClusterEvent.MemberUp) {
            ClusterEvent.MemberUp mUp = (ClusterEvent.MemberUp) message;
            log.info("Member is Up: {}", mUp.member());

        } else if (message instanceof ClusterEvent.UnreachableMember) {
            ClusterEvent.UnreachableMember mUnreachable = (ClusterEvent.UnreachableMember) message;
            log.info("Member detected as unreachable: {}", mUnreachable.member());

        } else if (message instanceof ClusterEvent.MemberRemoved) {
            ClusterEvent.MemberRemoved mRemoved = (ClusterEvent.MemberRemoved) message;
            log.info("Member is Removed: {}", mRemoved.member());

        } else if (message instanceof ClusterEvent.MemberEvent) {
            // ignore
        } else {
            unhandled(message);
        }
    }
}
  1. 集群启动的主函数
public class ClusterDemo {
    public static void main(String[] args) {
        String port = args[0];
        Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port)
                .withFallback(ConfigFactory.load());
		// 配置文件中为mysys
        ActorSystem system = ActorSystem.create("mysys", config);
        system.actorOf(Props.create(SimpleClusterListener.class),"clusterListner" + port);
    }
}

Akka集群统计单词Demo

image-1692779967217

  1. 首先编写后端代码:
  • 订阅集群事件,如果有新的节点启动,判断其如果是前端节点,那么发送ok消息给前端节点
  • 收到Article消息后,处理Article消息
@Slf4j
public class WordCountService extends AbstractActor {

    Cluster cluster = Cluster.get(getContext().system());

    @Override
    public void preStart() throws Exception {
        super.preStart();
        cluster.subscribe(getSelf(), ClusterEvent.MemberUp.class);
    }

    @Override
    public void postStop() throws Exception {
        super.postStop();
        cluster.unsubscribe(getSelf());
    }

    @Override
    public Receive createReceive() {
        return ReceiveBuilder.create().match(FrontEndService.Article.class, this::processArticle)
                .match(ClusterEvent.MemberUp.class, this::processMemberUp)
                .build();
    }

    private void processMemberUp(ClusterEvent.MemberUp memberUp) {
        Member member = memberUp.member();
        log.info("Member角色:{},地址:{}", member.roles(), member.address());
        if (member.hasRole("wordFE")) {
            getContext().actorSelection(member.address() + "/user/wordFrontService")
                    .tell("ok", getSelf());
        }
        log.info("{} is up", member);
    }

    private void processArticle(FrontEndService.Article article) {
        log.info("当前节点 {}, {} 正在处理", cluster.selfAddress(), getSelf());
        String content = article.getContent();
        int wordCount = content.split("").length;

        FrontEndService.CountResult result = new FrontEndService.CountResult();
        result.setCount(wordCount);
        result.setId(article.getId());
        getSender().tell(result, getSelf());
    }

    public static void main(String[] args) {
        Config config = ConfigFactory
                .parseString("akka.remote.netty.tcp.port=" + 2551)
                .withFallback(ConfigFactory.load());

        ActorSystem system = ActorSystem.create("mysys", config);
        ActorRef backEndRef = system.actorOf(Props.create(WordCountService.class), "backEnd1");

    }
}
  1. 编写前端Actor
  • 如果收到Article消息,将其转发给后端处理
  • 收到String消息,判断消息类型,如果是后端的ok消息,那么加入后端节点,其他询问消息正常返回
@Slf4j
public class FrontEndService extends AbstractActor {

    private final List<ActorRef> wordCountService = new LinkedList<>();

    private int jobCount = 0;


    @Override
    public Receive createReceive() {
        return ReceiveBuilder.create()
                .match(Article.class, this::processArticle)
                .match(String.class, this::processString)
                .match(Terminated.class, this::removeNode)
                .build();
    }

    private void removeNode(Terminated sig) {
        log.info("{} 被移除", getSender());
        this.wordCountService.remove(getSender());
    }

    private void processString(String msg) {
        if (msg.equals("ok")) {
            ActorRef sender = getSender();
            log.info("{} 可用", sender);
            wordCountService.add(sender);
            getContext().watch(sender);
        } else if (msg.equals("isOK")) {
            if (wordCountService.isEmpty()) {
                getSender().tell("notOK", getSelf());
            } else {
                getSender().tell("ImOK", getSelf());
            }
        }
    }

    private void processArticle(Article article) {
        jobCount++;
        int backNodeIndex = jobCount % wordCountService.size();
        log.info("选择节点:{}", backNodeIndex);
        wordCountService.get(backNodeIndex).forward(article, getContext());

    }


    @Data
    public static class Article implements Serializable {
        String id;
        String content;
    }

    @Data
    public static class CountResult implements Serializable {
        String id;
        int count;
    }

    public static void main(String[] args) {
        Config config = ConfigFactory
                .parseString("akka.remote.netty.tcp.port=" + 2553)
                .withFallback(
                        ConfigFactory.parseString("akka.cluster.roles=[wordFE]"))
                .withFallback(ConfigFactory.load());

        ActorSystem system = ActorSystem.create("mysys", config);
        ActorRef frontRef = system.actorOf(Props.create(FrontEndService.class), "wordFrontService");

        log.info("前端服务路径:{}", frontRef.path());

        String result = "";
        for (; ; ) {
            Future<Object> future = Patterns.ask(frontRef, "isOK", 1000);
            try {
                result = (String) Await.result(future, Duration.create(1000, "seconds"));
                if (result.equals("ImOK")) {
                    log.info("系统已经OK了");
                    break;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        List<Article> arts = new LinkedList<>();
        File dir = new File("/User/macaoyuan/Download");
        File[] files = dir.listFiles();
        assert files != null;
        for (File file : files) {
            Article article = new Article();
            article.setId(file.getName());
            // 为了简化
            article.setContent(file.getAbsolutePath());
            arts.add(article);
        }

        Timeout timeout = new Timeout(Duration.create(3, TimeUnit.SECONDS));
        for (Article art : arts) {
            Patterns.ask(frontRef, art, timeout).onSuccess(new OnSuccess<Object>() {
                @Override
                public void onSuccess(Object result) throws Throwable {
                    CountResult countResult = (CountResult) result;
                    log.info("统计结果:Id{} 数量{}", countResult.getId(), countResult.getCount());
                }
            }, system.dispatcher());
        }

    }
}

Akka Remote

官方推荐最好使用Akka Cluster,而不是直接使用Akka Remote,因为Remote模块主要就是用于给Cluster使用的底层模块,但是对于了解Cluster的细节还是有很大帮助的。

Akka有两种类型的远程方式:

  • 查询:在远端节点上查询是否有对应的actor,actorSelection(path)
  • 创建:在远端节点上创建actor,actorOf(Props(…), actorName)

Akka Remote的使用

  1. 添加依赖
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-remote_2.12</artifactId>
  <version>2.5.23</version>
</dependency>
  1. 配置文件中,开启remote能力,至少要有如下内容
akka {
  actor {
  	# 本地运行的provider为local,远程的即为remote
    provider = remote
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    
    # actor运行的主机,用于标记唯一的系统方便通信,端口设置为0的时候,它会自动选择端口
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2552
    }
 }
}
  1. 查询:
ActorSelection selection =
  context.actorSelection("akka.tcp://app@10.0.0.1:2552/user/serviceA/worker");
selection.tell("Pretty awesome feature", getSelf());
  1. 创建远端Actor,在配置文件中做一些修改:
    当system.actorOf(new Props(…), “sampleActor”)创建sampleActor的时候,actor不会直接的被创建而是到远端的节点上创建,配置文件比较灵活也可以添加白名单,编程方式等处理
akka {
  actor {
    deployment {
      /sampleActor {
        remote = "akka.tcp://sampleActorSystem@127.0.0.1:2553"
      }
    }
  }
}

其余的使用方式与普通的actor没有差别