Akka in JAVA(四)
最后这个部分讲一讲AKKA中的事件消息类型. 在Akka中主要是有三种消息类型,每一种类型对应了不同的使用场景.他们分别是:Fire and Forget模式
,Send and Receive模式
和Publisher-Subscriber模式
.
Fire and Forget模式
这种发送消息的模式是Akka中所推荐的,也是我们前面一直在使用的方式.也就是单向消息模式,Actor在发送消息之后,并不需要获取响应.这种方式在JAVA中需要使用ActorRef
或ActorSelection
的tell
方法.和消息队列类似,直接调用该方法即可,程序不会阻塞,会直接执行后面的操作,但是消息已经发送给目标的Actor了.这种方式的具体使用方法前面已经列举了很多了,这里就不再重复的举例了.
Send and Receive模式
这种发送消息的模式是双向的,当Actor在发送消息之后,会接收到一个Future对象.和JAVA的Future一样,通过这个可以异步的接收到对方的结果消息.
在整个AKKA中,提供了一套完整的Future机制,不光是在Actor传递消息间可以使用,也可以在非Actor中直接使用.
在代码中用来异步运算
在一般的代码中,我们除了可以直接使用JAVA提供了Future
机制实现异步处理外,还可以使用Akka
提供了Future
机制来实现异步处理,这样的好处在于可以直接的与Akka做无缝的集成.
与Future
相关的类和方法都在akka.dispatch
和akka.pattern.Patterns
中.
比如我们来看一个很简单的例子:
|
|
这个例子使用Futures.future
创建了一个异步的代码执行块.然后可以指定Future
的onSuccess
和onFailure
等状态的响应.当Future执行到这些状态的时候,就会执行响应代码中的方法.
比如这个例子中,在Futures.future
异步执行体中随机的返回正常结果或抛出异常.然后增加了future
成功和失败的两个状态的处理,分别是打印正常的结果和打印失败的异常.当我们执行这段代码后,由于异步代码块中休眠了10毫秒,因此必然会先打印这个地方是外面
这句话,这也证明了这些代码全是异步执行的.然后随机的会打印成功或失败的信息.
除了可以给Future
设定onSuccess
onFailure
和onComplete
外,还有一个很有用的功能就是可以设定Future
的后续操作,也就是多个Future
联合操作.比如接着上一个例子中的:
|
|
当执行完f
的onSuccess
方法后,接下来会执行第一个andThen
中所指定的异步操作,而后继续执行第二个andThen
中的操作.通过这样的操作,就可以形成一条异步调用链.
除此之外,Future
还有很多有用的方法,比如foreach
transform
map
filter
等等.这些个方法和JDK1.8中的StreamAPI
非常的相似.具体的可以参考这里.
在Actor中用来发送消息
除了直接在代码中使用Future
功能来使用异步操作外,另外一个用法就是真正的在Actor中进行消息的传递了. 在Akka in JAVA(一)中我们已经演示过通过MailInbox
的方式来接收消息的反馈,而使用Akka的Future
方式进行Send and Receive模式
的消息传递是第二种方式.
Akka提供了ask
和pipe
两个方法来实现Send and Receive模式
的消息通信.
ask
方法是Patterns
类提供的.它最常用的方法签名是Future<Object> ask(ActorRef actor, Object message, Timeout timeout)
.也就是像某一个或某一组Actor发送一个消息,并设定一个超时时间,它返回一个Future
对象.这样就可以像上面的例子那样的设定响应的事件处理了. 但是需要注意的是,ask的被调用Actor必须在onReceive
方法中显示的调用getSender().tell(xxxx,getSelf())
发送Response为返回的Future填充数据.
同时,Akka还提供了一个Await.result
方法来阻塞的获取Future
的结果.比如:
|
|
而pipe
方法同样是Patterns
类提供的.它的最主要的作用是把一个Future
的结果发送给某一个Actor.也就是指定当某一个future
执行结束后,把结果发送给某个Actor.
在我们的上一篇博客的例子中FactorialBackend
类就使用了这个语法:
|
|
它使用future
函数,异步的计算阶乘.并且把计算结果进行封装处理.然后通过pipe
方法,把这个异步的结果尝试发送给getSender()
,从而达到整个代码完全非阻塞的结果.
Publisher-Subscriber模式
这种模式也就是消息的订阅和发布.用处非常的广泛,比如一个Publisher和多个Subscriber的组合应用,形成事件的广播,前者会将消息同时发送给所有的订阅者,实现分布式的并行处理.比如针对订单的处理,当用户下了订单后,既要生成订单数据,又要通知库存还要通知卖方和买房.于是,就可以将这些不同的任务交给不同的Subscriber,当接收到消息后,同时对订单进行处理.另外,这种模式还可以对Actor的生命周期进行完整的监听,当Actor的节点成员发生变化后,其他节点可以及时的进行各种处理.
具体的例子,可以参考上一篇博客中的SimpleClusterListener
,它就是在preStart
方法中调用了cluster.subscribe
方法订阅了集群中的MemberEvent.class
UnreachableMember.class
两个事件.
除了监听集群中的状态外,还可以通过调用system.eventStream()
来获取消息总线,从而调用subscribe
和publish
方法来发布和订阅消息.
Demo
这里使用一个稍微复杂点的例子来对Akka做一个简单的总结.它涉及到了Akka中的远程调用,本地调用,集群调用.使用了Future
Publisher-Subscriber
等特性,算是一个比较全面的例子.
这个例子原载于http://shiyanjun.cn/archives/1186.html上,是使用scala
写的,我用JAVA
重写了一次,并增加了一些东西.这个例子主要的功能是实现一个简单的模拟日志实时处理的集群系统,类似于Flume
,可以从某一个数据源中输入数据,然后程序收集数据,然后经过一个拦截器层,处理数据,并且转换为特定的格式,最后把数据写入Kafka
中.具体的逻辑如下图:
在上图中,将日志处理系统分为了3个子部分,通过Akka的Role来进行划分,3个角色分别为collector
收集器,interceptor
拦截器,processor
处理器,3个子系统中的节点都是整个Akka集群中的成员.整个集群系统的数据流向是:collector
接收数据,然后将数据发送到interceptor
,interceptor
接收到数据后,解析出真实IP地址,拦截存在于黑名单中的IP请求,如果IP地址不在黑名单,则发送给processor
去转换为最终的模型,然后保存到Kafka中.
首先,我们需要定义的是几个子系统间传递的消息:
EventMessages.java
|
|
然后就是抽象出来的订阅集群事件相关的逻辑.
ClusterRoledWorker.java
|
|
然后,就是对整个Akka的集群进行配置:
demo8.conf
|
|
接下来就是Collector的实现,它是一个Acotr,继承自ClusterRoledWorker抽象类:
EventCollector.java
|
|
然后就是Interceptor的实现,和Collector类似,同样是继承自ClusterRoledWorker
抽象类:
EventInterceptor.java
|
|
然后就是Interceptor这个子系统的启动器:
EventInterceptorMain.java
|
|
在接着就是Processor的实现了:
EventProcessor.java
|
|
上面涉及到kafka消息的发送,我使用了一个工具类来进行消息的发送:
KafkaTemplate.java
|
|
同样需要Processor子系统的启动器:
EventProcessorMain.java
|
|
最后就是需要写一个模拟的发送日志的客户端了,并且包含了Collector子系统的启动器:
EventClient.java
|
|
最后就是整个Demo的入口:
Demo8App.java
|
|
当执行了这个Demo后,首先会看到的就是和上一篇博客中一样的集群启动和加入的消息.而后会看到每5秒接收到一批Nginx的日志,并发送给Kafka.这时可以通过Kafka的客户端看到这些处理后的日志:
|
|
总结
经过这四篇的博客,简单的介绍了一下Akka是什么,大概的用处是什么,以及Akka在JAVA中该如何的使用.由于篇幅以及本人经验的原因,还有很多都没有讲到,以后如有涉及,再慢慢的补上.总之,Akka是一个非常强大的框架,在现在大数据,高性能,分布式的环境下可以发挥很多的作用,各位可以试一试.
PS:本系列文章中所有的Demo的源码已上传至GitHub中.