【scala系列】14、Akka并发变成和分布式应用程序开发


1.介绍

AKKA是java虚拟机jvm平台上构建高并发、分布式和容错应用的工具包和运行时. akka用scala语言编写,同时提供了scala和java接口

  • 官网:https://akka.io/
  • ActorSystem:创建并监听actor
  • Actor:执行任务的角色,如果任务执行不过来可以创建下一级actor,执行具体的任务

基于akka分布式技术开发分布式应用程序,分为两种角色:master、worker

1.1 master

  • 接收worker的注册,并将worker注册信息保存
  • 感知worker的上下线
  • 接收worker的会报心跳,更新worker的相关信息
  • 定时检测超时的worker,并将超时的worker从集群中移除

1.2 worker

  • 向master进行注册,加入到集群中去
  • 定时向master汇报心跳

1.3 示意图

示意图

2.代码示例

2.1 导包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor -->

<dependency>

<groupId>com.typesafe.akka</groupId>

<artifactId>akka-actor_2.11</artifactId>

<version>2.3.16</version>

</dependency>

<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-remote -->

<dependency>

<groupId>com.typesafe.akka</groupId>

<artifactId>akka-remote_2.11</artifactId>

<version>2.3.16</version>

</dependency>

2.2 master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import java.util

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.collection.mutable
import scala.concurrent.duration._

class Master extends Actor {

/** 存放 worker 信息 */
val id2Worker = new mutable.HashMap[String, WorkerInfo]()
/** 存放 workerInfo */
val workers = new mutable.HashSet[WorkerInfo]()
//检测worker时间间隔
val checkWorkerOutTimeIntever = 3000
/** actor启动后会立即执行该方法 */
override def preStart(): Unit = {
//导入定时器
import context.dispatcher
//开启定时任务 parameters=>什么时候开始执行任务,执行间隔,给谁定的任务,做怎么任务
context.system.scheduler.schedule(0 millis, checkWorkerOutTimeIntever millis, self, CheckWorkerTimeOut)
}

/** 用来收发消息的方法,这个方法会被多次执行,只要有消息过来就会被执行 */
override def receive: Receive = {
/** 接收到启动消息 */
case "started" => println("master start up")
//接收worker发送过来的worker消息,保存起来
case RegisterWorker(wokerId, cores, memory) => {
if (!id2Worker.contains(wokerId)) {
//将worker 发送过来的worker信息,进行封装成WorkerInfo 保存到map和set中
val workerInfo = new WorkerInfo(wokerId, cores, memory)
id2Worker.put(wokerId, workerInfo)
workers += workerInfo
//sender 指代消息源,即谁发送过来的消息,就指代谁
sender()!RegisteredWorker
}
}
/**接收worker汇报的心跳信息*/
case HeartBeat(workerId)=>{
//从id2Worker 中取出对应的worker并更新最近一次汇报心跳的时间
if(id2Worker.contains(workerId)){
//取出wokerinfo 信息
val workerInfo = id2Worker(workerId)
//更新最后心跳时间
workerInfo.lastHeartBeatTime = System.currentTimeMillis()
}
}
//检测超时的worker并从列表中移除
case CheckWorkerTimeOut=>{
val currentTime = System.currentTimeMillis()
//过滤出超时的worker
val toRemove = workers.filter(w => currentTime - w.lastHeartBeatTime > checkWorkerOutTimeIntever)
toRemove.foreach(workerInfo =>{
id2Worker.remove(workerInfo.workerId)
workers-=workerInfo
})

println(s"worker of alive is ${workers.size}")
}
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
object Master {
def main(args: Array[String]): Unit = {
val masterHost = "localhost"
val masterPort = "8081"

val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$masterHost"
|akka.remote.netty.tcp.port = "$masterPort"
""".stripMargin
val config = ConfigFactory.parseString(configStr)
//创建ActorSystem
val masterActorSystem = ActorSystem("masterActorSystem", config)
//创建actor
val masterActor = masterActorSystem.actorOf(Props[Master], "masterActor")
//给新创建的master发送一条消息(发送消息用感叹号 "!" 进行发送)
masterActor ! "started"
//将ActorSystem 阻塞在这,不要让其停止掉
masterActorSystem.awaitTermination()
//masterActorSystem.whenTerminated
}
}

2.3 RemoteMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
trait RemoteMessage extends Serializable{}

/**worker 向master 发送消息,注册*/
case class RegisterWorker(var wokerId:String,var cores:Int,var memory:Int) extends RemoteMessage
/**master 向 worker 发送的消息*/
case class RegisteredWorker() extends RemoteMessage

/**worker 给自己发送的消息*/
object SendHeartBeat

/**worker 向master发送心跳消息*/
case class HeartBeat(var workerId:String)
/**master 向自己发送消息 定时检测超时worker*/
object CheckWorkerTimeOut

2.4 Worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.util.UUID

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._

class Worker(var cores: Int, var memory: Int) extends Actor {
var masterActor: ActorSelection = null
/** 用来收发消息的方法,这个方法会被多次执行,只要有消息过来就会被执行 */


val workerId = UUID.randomUUID().toString
//心跳时间间隔
val heartBeatIntever = 3000

/** * actor启动后会立即执行该方法 */
override def preStart(): Unit = {
/** 获取masterActorSystem的地址 */
masterActor = context.actorSelection("akka.tcp://masterActorSystem@localhost:8081/user/masterActor")
//启动后立即向master注册信息
masterActor ! RegisterWorker(workerId, cores, memory)
}

override def receive: Receive = {
/** 接收到启动消息 */
case "started" => println("worker start up")

/** 接收到master发送过来的worker注册成功的消息 */
case RegisteredWorker => {
//导入定时器
import context.dispatcher
//开启定时任务 parameters=>什么时候开始执行任务,执行间隔,给谁定的任务,做怎么任务
context.system.scheduler.schedule(0 millis, heartBeatIntever millis, self, SendHeartBeat)
}
/** 向master汇报心跳 */
case SendHeartBeat => {
masterActor!HeartBeat(workerId)
}
}

}

1
2
3
4
5
6
7
8
9
10
11
/**
* 用来封装worker的基本信息
* @param workerId
* @param cores
* @param memory
*/
class WorkerInfo(var workerId:String,var cores:Int,var memory:Int) extends Serializable {
/**worker 最近一次汇报心跳的时间*/
var lastHeartBeatTime:Long = System.currentTimeMillis()
}


文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
  目录