1.介绍
AKKA是java虚拟机jvm平台上构建高并发、分布式和容错应用的工具包和运行时. akka用scala语言编写,同时提供了scala和java接口
- 官网:https://akka.io/
- ActorSystem:创建并监听actor
- Actor:执行任务的角色,如果任务执行不过来可以创建下一级actor,执行具体的任务
基于akka分布式技术开发分布式应用程序,分为两种角色:master、worker
1.1 master
- 接收worker的注册,并将worker注册信息保存
- 感知worker的上下线
- 接收worker的会报心跳,更新worker的相关信息
- 定时检测超时的worker,并将超时的worker从集群中移除
1.2 worker
- 向master进行注册,加入到集群中去
- 定时向master汇报心跳
1.3 示意图

2.代码示例
2.1 导包
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.11</artifactId> <version>2.3.16</version> </dependency>
<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-remote_2.11</artifactId> <version>2.3.16</version> </dependency>
|
2.2 master
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| import java.util import akka.actor.{Actor, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.collection.mutable import scala.concurrent.duration._ class Master extends Actor { val id2Worker = new mutable.HashMap[String, WorkerInfo]() val workers = new mutable.HashSet[WorkerInfo]() val checkWorkerOutTimeIntever = 3000 override def preStart(): Unit = { import context.dispatcher context.system.scheduler.schedule(0 millis, checkWorkerOutTimeIntever millis, self, CheckWorkerTimeOut) } override def receive: Receive = { case "started" => println("master start up") case RegisterWorker(wokerId, cores, memory) => { if (!id2Worker.contains(wokerId)) { val workerInfo = new WorkerInfo(wokerId, cores, memory) id2Worker.put(wokerId, workerInfo) workers += workerInfo sender()!RegisteredWorker } } case HeartBeat(workerId)=>{ if(id2Worker.contains(workerId)){ val workerInfo = id2Worker(workerId) workerInfo.lastHeartBeatTime = System.currentTimeMillis() } } case CheckWorkerTimeOut=>{ val currentTime = System.currentTimeMillis() val toRemove = workers.filter(w => currentTime - w.lastHeartBeatTime > checkWorkerOutTimeIntever) toRemove.foreach(workerInfo =>{ id2Worker.remove(workerInfo.workerId) workers-=workerInfo }) println(s"worker of alive is ${workers.size}") } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| object Master { def main(args: Array[String]): Unit = { val masterHost = "localhost" val masterPort = "8081" val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$masterHost" |akka.remote.netty.tcp.port = "$masterPort" """.stripMargin val config = ConfigFactory.parseString(configStr) val masterActorSystem = ActorSystem("masterActorSystem", config) val masterActor = masterActorSystem.actorOf(Props[Master], "masterActor") masterActor ! "started" masterActorSystem.awaitTermination() } }
|
2.3 RemoteMessage
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| trait RemoteMessage extends Serializable{}
case class RegisterWorker(var wokerId:String,var cores:Int,var memory:Int) extends RemoteMessage
case class RegisteredWorker() extends RemoteMessage
object SendHeartBeat
case class HeartBeat(var workerId:String)
object CheckWorkerTimeOut
|
2.4 Worker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| import java.util.UUID import akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ class Worker(var cores: Int, var memory: Int) extends Actor { var masterActor: ActorSelection = null val workerId = UUID.randomUUID().toString val heartBeatIntever = 3000 override def preStart(): Unit = { masterActor = context.actorSelection("akka.tcp://masterActorSystem@localhost:8081/user/masterActor") masterActor ! RegisterWorker(workerId, cores, memory) } override def receive: Receive = { case "started" => println("worker start up") case RegisteredWorker => { import context.dispatcher context.system.scheduler.schedule(0 millis, heartBeatIntever millis, self, SendHeartBeat) } case SendHeartBeat => { masterActor!HeartBeat(workerId) } } }
|
1 2 3 4 5 6 7 8 9 10 11
|
class WorkerInfo(var workerId:String,var cores:Int,var memory:Int) extends Serializable { var lastHeartBeatTime:Long = System.currentTimeMillis() }
|