1.介绍
AKKA是java虚拟机jvm平台上构建高并发、分布式和容错应用的工具包和运行时. akka用scala语言编写,同时提供了scala和java接口
- 官网:https://akka.io/
- ActorSystem:创建并监听actor
- Actor:执行任务的角色,如果任务执行不过来可以创建下一级actor,执行具体的任务
基于akka分布式技术开发分布式应用程序,分为两种角色:master、worker
1.1 master
- 接收worker的注册,并将worker注册信息保存
- 感知worker的上下线
- 接收worker的会报心跳,更新worker的相关信息
- 定时检测超时的worker,并将超时的worker从集群中移除
1.2 worker
- 向master进行注册,加入到集群中去
- 定时向master汇报心跳
1.3 示意图
2.代码示例
2.1 导包
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.3.16</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-remote -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId>
<version>2.3.16</version>
</dependency>
2.2 master
import java.util
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.collection.mutable
import scala.concurrent.duration._
class Master extends Actor {
/** 存放 worker 信息 */
val id2Worker = new mutable.HashMap[String, WorkerInfo]()
/** 存放 workerInfo */
val workers = new mutable.HashSet[WorkerInfo]()
//检测worker时间间隔
val checkWorkerOutTimeIntever = 3000
/** actor启动后会立即执行该方法 */
override def preStart(): Unit = {
//导入定时器
import context.dispatcher
//开启定时任务 parameters=>什么时候开始执行任务,执行间隔,给谁定的任务,做怎么任务
context.system.scheduler.schedule(0 millis, checkWorkerOutTimeIntever millis, self, CheckWorkerTimeOut)
}
/** 用来收发消息的方法,这个方法会被多次执行,只要有消息过来就会被执行 */
override def receive: Receive = {
/** 接收到启动消息 */
case "started" => println("master start up")
//接收worker发送过来的worker消息,保存起来
case RegisterWorker(wokerId, cores, memory) => {
if (!id2Worker.contains(wokerId)) {
//将worker 发送过来的worker信息,进行封装成WorkerInfo 保存到map和set中
val workerInfo = new WorkerInfo(wokerId, cores, memory)
id2Worker.put(wokerId, workerInfo)
workers += workerInfo
//sender 指代消息源,即谁发送过来的消息,就指代谁
sender()!RegisteredWorker
}
}
/**接收worker汇报的心跳信息*/
case HeartBeat(workerId)=>{
//从id2Worker 中取出对应的worker并更新最近一次汇报心跳的时间
if(id2Worker.contains(workerId)){
//取出wokerinfo 信息
val workerInfo = id2Worker(workerId)
//更新最后心跳时间
workerInfo.lastHeartBeatTime = System.currentTimeMillis()
}
}
//检测超时的worker并从列表中移除
case CheckWorkerTimeOut=>{
val currentTime = System.currentTimeMillis()
//过滤出超时的worker
val toRemove = workers.filter(w => currentTime - w.lastHeartBeatTime > checkWorkerOutTimeIntever)
toRemove.foreach(workerInfo =>{
id2Worker.remove(workerInfo.workerId)
workers-=workerInfo
})
println(s"worker of alive is ${workers.size}")
}
}
}
object Master {
def main(args: Array[String]): Unit = {
val masterHost = "localhost"
val masterPort = "8081"
val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$masterHost"
|akka.remote.netty.tcp.port = "$masterPort"
""".stripMargin
val config = ConfigFactory.parseString(configStr)
//创建ActorSystem
val masterActorSystem = ActorSystem("masterActorSystem", config)
//创建actor
val masterActor = masterActorSystem.actorOf(Props[Master], "masterActor")
//给新创建的master发送一条消息(发送消息用感叹号 "!" 进行发送)
masterActor ! "started"
//将ActorSystem 阻塞在这,不要让其停止掉
masterActorSystem.awaitTermination()
//masterActorSystem.whenTerminated
}
}
2.3 RemoteMessage
trait RemoteMessage extends Serializable{}
/**worker 向master 发送消息,注册*/
case class RegisterWorker(var wokerId:String,var cores:Int,var memory:Int) extends RemoteMessage
/**master 向 worker 发送的消息*/
case class RegisteredWorker() extends RemoteMessage
/**worker 给自己发送的消息*/
object SendHeartBeat
/**worker 向master发送心跳消息*/
case class HeartBeat(var workerId:String)
/**master 向自己发送消息 定时检测超时worker*/
object CheckWorkerTimeOut
2.4 Worker
import java.util.UUID
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
class Worker(var cores: Int, var memory: Int) extends Actor {
var masterActor: ActorSelection = null
/** 用来收发消息的方法,这个方法会被多次执行,只要有消息过来就会被执行 */
val workerId = UUID.randomUUID().toString
//心跳时间间隔
val heartBeatIntever = 3000
/** * actor启动后会立即执行该方法 */
override def preStart(): Unit = {
/** 获取masterActorSystem的地址 */
masterActor = context.actorSelection("akka.tcp://masterActorSystem@localhost:8081/user/masterActor")
//启动后立即向master注册信息
masterActor ! RegisterWorker(workerId, cores, memory)
}
override def receive: Receive = {
/** 接收到启动消息 */
case "started" => println("worker start up")
/** 接收到master发送过来的worker注册成功的消息 */
case RegisteredWorker => {
//导入定时器
import context.dispatcher
//开启定时任务 parameters=>什么时候开始执行任务,执行间隔,给谁定的任务,做怎么任务
context.system.scheduler.schedule(0 millis, heartBeatIntever millis, self, SendHeartBeat)
}
/** 向master汇报心跳 */
case SendHeartBeat => {
masterActor!HeartBeat(workerId)
}
}
}
/**
* 用来封装worker的基本信息
* @param workerId
* @param cores
* @param memory
*/
class WorkerInfo(var workerId:String,var cores:Int,var memory:Int) extends Serializable {
/**worker 最近一次汇报心跳的时间*/
var lastHeartBeatTime:Long = System.currentTimeMillis()
}