Akka in JAVA(三)
上两个部分讲了Akka
的基本知识和常见的用法.接下来讲一讲Akka
的远程调用以及集群的使用.因为在现在的项目中,基本上都是分布式的,单个的应用程序都快成为”熊猫”了.因此Akka
的远程以及集群调用就是非常有必要的了.
Remote调用
Akka-Remoting
是采用了P2P(peer-to-peer)的通信方式设计的,也就是端对端的方式.特别是Akka-Remoting不能与网络地址转换和负载均衡一起的工作.
但是,由于Akka
在设计的时候就考虑了远程调用以及分布式的情况.因此,Akka-Remoting
在使用上就非常的简单,几乎等于是透明的,和本地调用几乎相同.除了传递的消息需要可序列化以及创建和查找Actor的时候路径稍有不同外,没有其他的区别了.
远程调用的准备
要在项目中使用Akka-Remoting
非常的简单,只需要引入Maven中的akka-remote
就可以了.
|
|
配置
由于Akka几乎没有特别的为Remoting
提供专门的API,区别仅仅在于配置.因此,接下来就是要修改项目中的akka的配置了:
|
|
我们来看看这个配置和本地的有什么不同.
这个配置文件在akka
的配置项中添加了一个actor
配置项,并指定provider
也就是Actor提供者为akka.remote.RemoteActorRefProvider
,即远程Actor提供者.
然后定义了remote
远程传输方式,使用akka.remote.netty.tcp
即netty
的方式提供服务,服务的IP和端口分别是127.0.0.1
和2552
,就这么简单.而由于是端对端的通信,因此客户端的配置和服务器端的是一样的.
以上只是远程调用的最小的配置,完整的可选配置如下:
|
|
创建远程Actor
通过上面的配置后,在程序里面创建远程的Actor就非常的简单了,基本上感觉不到是创建的远程Actor.
只需要在创建ActorSystem
的时候使用上面所说的配置文件即可,接下来的就和本地的Actor没有任何的区别:
|
|
通过这个actorSystem
创建出来的Actor的路径会是这样的:akka.tcp://WCMapReduceApp@127.0.0.1:2552/user/remoteActor
,即是一个远程的Actor.
当然,除了直接在服务器端创建服务外,还能在客户端远程的要求服务器端创建一个Actor,并保持引用.
要实现这样的功能,同样需要修改配置文件:
|
|
这个配置文件告诉akka,在创建一个/sampleActor
的时候,即system.actorOf()
.指定的actor并不会在本地创建,而是会请求远程的actorSystem创建这个actor.
查找远程Actor
当客户端发布了一个远程的Actor后,客户端就需要调用它.而向它发送消息的先决条件就是要找到这个Actor.
查询远程的Actor也非常的简单.每一个远程的Actor都会有一个它自己的Path.其格式是:akka://<actorsystemname>@<hostname>:<port>/<actor path>
,比如上面所说的akka.tcp://WCMapReduceApp@127.0.0.1:2552/user/remoteActor
.那么获取这个Actor的ActorRef
,就是通过actorFor
或actorSelection
方法传入这个ActorPath
即可.
|
|
接下来的操作就和本地的Actor一模一样了.
序列化
既然是远程调用,那么就涉及到消息的序列化.Akka内置了集中序列化的方式,也提供了序列化的扩展,你可以使用内置的序列化方式,也可以自己实现一个.
要选择使用何种序列化,需要修改配置文件.在akka
一节中配置serializers
选项,指定序列化的实现类:
|
|
配置好这个后,还可以绑定数据类型与序列化方式之间的映射:
|
|
上面这段代码就指定了各种数据类型分别采用不同的序列化方式.
如果觉得Akka内置的序列化方式不满足你的要求,可以自定义一个序列化类(通常并不需要).
这个也比较的简单,需要自定义的序列化类继承自JSerializer
类.然后实现其中的方法:
|
|
远程Actor的路由
由于Akka-remoting
是基于点对点的.因此,并不能很好的使用网络提供的负载均衡等功能.其实,要解决这个问题,我们可以使用Akka
的路由功能.即在配置远程Actor的时候,增加router
的参数.
|
|
那么当向这个/parent/remotePool
发送消息的时候,会轮询的把消息分发到不同的远程服务上,从而实现了高可用和负载均衡.
远程事件
同Akka的本地Actor的生命周期Hook相同,Akka为远程的Actor申明了很多的事件.我们可以监听这些远程调用中发生的事件,也可以订阅这些事件.只需要在ActorSystem.eventStream
中为下面的事件增加注册监听器即可. 需要注意的是如果要订阅任意的远程事件,是订阅RemotingLifecycleEvent
,如果只订阅涉及链接的生命周期,需要订阅akka.remote.AssociationEvent
.
- DisassociatedEvent : 链接结束事件,这个事件包含了链接方向以及参与方的地址.
- AssociatedEvent : 链接成功建立事件,这个事件包含了链接方向以及参与方的地址.
- AssociationErrorEvent : 链接相关错误事件,这个事件包含了链接方向以及参与方的地址以及错误的原因.
- RemotingListenEvent : 远程子系统准备好接受链接时的事件,这个事件包含了链接方向以及参与方的地址.
- RemotingShutdownEvent : 远程子系统被关闭的事件
- RemotingErrorEvent : 远程相关的所有错误
Demo
这里举一个稍微复杂点的例子——单词计数.这个是Hadoop的入门的例子,我们使用Akka配合远程调用来实现一次. 功能是这样的,服务端提供了map/reduce方式的单词数量的计算功能,而服务端提供了文本内容的.
它大概的运行流程是这样的:
- 首先客户端的FileReadActor从文本文件中读取文件.
- 然后通过ClientActor发送给远端的服务端.
- 服务端通过WCMapReduceActor接受客户端发送的消息,并发消息放入优先级MailBox
- WCMapReduceActor把接收到的文本内容分发给MapActor做Map计算
- Map计算把结果都发送给ReduceActor,做汇总reduce计算.
- 最后AggregateActor把计算的结果显示出来.
我们先来看客户端:
FileReadActor.java
|
|
通过InputStreamReader把文本内容一行一个的发送给Sender.
ClientActor.java
|
|
这个Actor重载了preStart()
和postStop()
方法用以记录性能.
ClientMain.java
|
|
这个类里面就使用了远程Actor的查找方法,通过system.actorFor("akka.tcp://WCMapReduceApp@127.0.0.1:2552/user/WCMapReduceActor");
获取到了远程的Actor
client.conf
|
|
接下来我们来看看服务器端:
MyPriorityMailBox.java
|
|
通过这个类,自定了一个无边界的优先级邮箱,这样做的目的是保证DISPLAY_LIST
命令最后的响应.否则会出现文本内容还没有发送完成的情况下,就进行了结果的统计显示了.要使用这个自定义的优先级邮箱,需要在配置文件中进行配置:
server.conf
|
|
MapActor.java
|
|
当这个actor接收到消息后,判断是否是结束标识,如果是就发送消息给reduceActor表示已经结束了.否则就计算这一行中的单词的个数,并把这个个数发送给reduceActor.
ReduceActor.java
|
|
这个Actor接收到消息后,判断是什么消息,如果是结果消息,那么就对结果进行整理,得出某个单词出现的次数,否则就是结束标记,告诉管道Actor统计已经结束.
AggregateActor.java
|
|
这个actor接收到消息后,判断消息的类型,如果是DISPLAY_LIST
标识,那么就打印结果.如果是Boolean
就表示统计完成了,那么就发送消息给客户端.如果是Map
,那么这个就是某一个map/reduce的结果,那么就把这个结果聚合到最终的结果中去.
WCMapReduceActor.java
|
|
消息中转和统管的Actor,它统管了其他的几个Actor,是消息的入口.
WCMapReduceServer.java
|
|
服务端的入口程序,定义了一个50个actor的单词统计服务.并使用轮询模式来分发客服端接收到的统计任务.
以上就是整个DEMO的所有的代码.当执行这个程序后,会在控制台打印:
服务器端:
|
|
客户端:
|
|
到此,我们就实现了通过AKKA-remoting
来进行map/reduce
的简单计算.
Cluster调用
原理
Akka除了remoting远程调用外,还提供了支持去中心化的基于P2P的集群服务,并且不会出现单点故障.Akka的集群是基于Gossip协议实现的,支持服务自动失效检测,能够自动发现出现问题而离开集群的成员节点,通过事件驱动的方式,将状态传播到整个集群的其他成员节点中去.Gossip协议是点对点通信协议的一种,它受社交网络中的流言传播的特点所启发,解决了在超大规模集群下其他方式无法解决的单点等问题.
一个Akka集群是由一组成员节点组成的,每一个成员节点都是通过hostname:port:uid
来唯一标识,并且每一个成员节点间是完全解耦合的.
节点状态
Akka集群内部为集群中的成员定义了6种状态,并提供了状态转换矩阵,这6种状态分别是:
- Joining : 正在加入集群的状态
- Up : 正常提供服务的状态
- Leaving : 正在离开集群的状态
- Down : 节点服务下线的状态
- Exiting : 节点离开状态
- Removed : 节点被删除状态
在Akka集群中的每一个成员节点,都只可能处在这6种状态中的一种中.当节点状态发生变化的时候,会发出节点状态事件.需要注意的是,除了Down和Removed状态外,其他状态是有可能随时变为Down状态的,即节点故障而无法提供服务.处于Down状态的节点如果想要再次加入Akka集群中,需要重新启动,并加入Joining状态,然后才能进行后续状态的变化,加入集群.
故障监控
在Akka集群中,集群的每一个成员节点,都会被其他另外一组节点(默认是5个)所监控,这一组节点会通过心跳来检测被监控的节点是否处于Unreachable状态,如果不可达则这一组节点会将被监控的节点的Unreachable状态向集群中的其他所有节点传播,最终使集群中的每个成员节点都知道被监控的节点已经故障.
Akka集群中任一一个成员节点都有可能成为集群的Leader,这是基于Gossip协议收敛过程得到的确定性结果,并不是通过选举产生,从而避免了单点故障.在Akka集群中,Leader只是一种角色,在各轮Gossip收敛过程中Leader可能是不断变化的.Leader的职责就是让成员节点加入和离开集群.一个成员节点最开始是处于Joining状态,一旦所有其他节点都看到了新加入的该节点,则Leader会设置这个节点的状态为up.如果一个节点安全离开Akka集群,那么这个节点的状态会变为Leaving状态,当Leader看到该节点为Leaving状态,会将其状态修改为Exiting,然后通知所有其他节点,当所有节点看到该节点状态为exiting后,Leader将该节点移除,状态修改为removed状态.
配置
要在项目中使用Akka集群,首先需要的就是在项目的Maven中引入akka-Cluster:
|
|
然后在application.conf
中配置必要的参数:
|
|
上面的配置需要特别注意的就是种子节点
,种子节点是akka集群中的一种特殊的节点角色.
种子节点最主要的用处是提供Cluster的初始化和加入点,同时也为其他节点作为中间联系人.要启动Akka-Cluster就必须配置一些列的种子节点.这些种子节点就是一开始就能够预料到的节点,有节点加入的时候,会等种子节点的返回确认才算是加入成功.
更多的集群配置请参见:config-akka-cluster
集群事件
正如上文所说,当节点发生变化的时候,Leader会发送状态的事件给集群中的所有成员节点.因此,接收和处理这些事件也是非常重要的.
- MemberUp : 成员节点上线
- MemberExited : 成员节点下线
- MemberRemoved : 成员节点被剔除
- UnreachableMember : 成员节点无法到达
- ReachableMember : 成员节点可到达
- LeaderChanged : Leader变化
- RoleLeaderChanged : 角色Leader变化
- ClusterShuttingDown : 集群关闭
- ClusterMetricsChanged :
要说明这些节点的变化可以参考官方给出的最最简单的Akka集群的Demo,它不仅列出了一个最简单的Akka的集群要如何构建,也说明了这几个事件状态的变化.
集群事件Demo
在官方的文档中,编写了一个最简单的Akka-Cluster的例子,这个例子就是启动三个Akka的节点,并且监听了节点的所有事件,接收到事件后,打印出来.
demo6.conf
|
|
SimpleClusterListener.java
|
|
SimpleClusterApp.java
|
|
这个简单的例子启动了三个节点,并创建了一个Actor来监听集群中的各种事件,执行这个Demo,会在控制台打印:
|
|
从日志中就可以看出各种状态的变化
集群实践
阶乘服务Demo
这里通过一个简单的阶乘计算,来展示Akka-Cluster的使用.
这个Demo分为了前台和后台两个部分,前台只用来输入阶乘的大小以及打印计算的结果,后台节点负责真正的阶乘的计算.
demo7.conf
|
|
FactorialResult.java
|
|
FactorialBackend.java
|
|
FactorialBackendMain.java
|
|
FactorialFrontend.java
|
|
FactorialFrontendMain.java
|
|
FactorialApp.java
|
|
通过执行FactorialApp.java
可以启动整个演示. 它创建了3个后台节点和一个前台节点.
前台节点启动后,创建FactorialFrontend
Actor,这个Actor负责发送计算数给后台节点,以及接受计算的结果并打印出来.后台节点的ActorFactorialBackend
负责计算阶乘,并返回结果.
执行这个DEMO后,控制台会打印:
|
|