【scala系列】14、Akka并发变成和分布式应用程序开发


1.介绍

AKKA是java虚拟机jvm平台上构建高并发、分布式和容错应用的工具包和运行时. akka用scala语言编写,同时提供了scala和java接口

  • 官网:https://akka.io/
  • ActorSystem:创建并监听actor
  • Actor:执行任务的角色,如果任务执行不过来可以创建下一级actor,执行具体的任务

基于akka分布式技术开发分布式应用程序,分为两种角色:master、worker

1.1 master

  • 接收worker的注册,并将worker注册信息保存
  • 感知worker的上下线
  • 接收worker的会报心跳,更新worker的相关信息
  • 定时检测超时的worker,并将超时的worker从集群中移除

1.2 worker

  • 向master进行注册,加入到集群中去
  • 定时向master汇报心跳

1.3 示意图

示意图

2.代码示例

2.1 导包

<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor -->

<dependency>

    <groupId>com.typesafe.akka</groupId>

    <artifactId>akka-actor_2.11</artifactId>

    <version>2.3.16</version>

</dependency>

<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-remote -->

<dependency>

    <groupId>com.typesafe.akka</groupId>

    <artifactId>akka-remote_2.11</artifactId>

    <version>2.3.16</version>

</dependency>

2.2 master

import java.util

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.collection.mutable
import scala.concurrent.duration._

class Master extends Actor {

  /** 存放 worker 信息 */
  val id2Worker = new mutable.HashMap[String, WorkerInfo]()
  /** 存放 workerInfo */
  val workers = new mutable.HashSet[WorkerInfo]()
  //检测worker时间间隔
  val checkWorkerOutTimeIntever = 3000
  /** actor启动后会立即执行该方法 */
  override def preStart(): Unit = {
    //导入定时器
    import context.dispatcher
    //开启定时任务 parameters=>什么时候开始执行任务,执行间隔,给谁定的任务,做怎么任务
    context.system.scheduler.schedule(0 millis, checkWorkerOutTimeIntever millis, self, CheckWorkerTimeOut)
  }

  /** 用来收发消息的方法,这个方法会被多次执行,只要有消息过来就会被执行 */
  override def receive: Receive = {
    /** 接收到启动消息 */
    case "started" => println("master start up")
    //接收worker发送过来的worker消息,保存起来
    case RegisterWorker(wokerId, cores, memory) => {
      if (!id2Worker.contains(wokerId)) {
        //将worker 发送过来的worker信息,进行封装成WorkerInfo 保存到map和set中
        val workerInfo = new WorkerInfo(wokerId, cores, memory)
        id2Worker.put(wokerId, workerInfo)
        workers += workerInfo
        //sender  指代消息源,即谁发送过来的消息,就指代谁
        sender()!RegisteredWorker
      }
    }
    /**接收worker汇报的心跳信息*/
    case HeartBeat(workerId)=>{
    //从id2Worker 中取出对应的worker并更新最近一次汇报心跳的时间
      if(id2Worker.contains(workerId)){
        //取出wokerinfo 信息
        val workerInfo = id2Worker(workerId)
        //更新最后心跳时间
        workerInfo.lastHeartBeatTime = System.currentTimeMillis()
      }
    }
      //检测超时的worker并从列表中移除
    case CheckWorkerTimeOut=>{
      val currentTime = System.currentTimeMillis()
      //过滤出超时的worker
      val toRemove = workers.filter(w => currentTime - w.lastHeartBeatTime > checkWorkerOutTimeIntever)
      toRemove.foreach(workerInfo =>{
        id2Worker.remove(workerInfo.workerId)
        workers-=workerInfo
      })

      println(s"worker of alive is ${workers.size}")
    }
  }

}
object Master {
  def main(args: Array[String]): Unit = {
    val masterHost = "localhost"
    val masterPort = "8081"

    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$masterHost"
         |akka.remote.netty.tcp.port = "$masterPort"
      """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    //创建ActorSystem
    val masterActorSystem = ActorSystem("masterActorSystem", config)
    //创建actor
    val masterActor = masterActorSystem.actorOf(Props[Master], "masterActor")
    //给新创建的master发送一条消息(发送消息用感叹号 "!" 进行发送)
    masterActor ! "started"
    //将ActorSystem 阻塞在这,不要让其停止掉
    masterActorSystem.awaitTermination()
    //masterActorSystem.whenTerminated
  }
}

2.3 RemoteMessage

trait RemoteMessage extends Serializable{}

/**worker 向master 发送消息,注册*/
case class RegisterWorker(var wokerId:String,var cores:Int,var memory:Int) extends RemoteMessage
/**master 向 worker 发送的消息*/
case class RegisteredWorker() extends  RemoteMessage

/**worker 给自己发送的消息*/
object SendHeartBeat

/**worker 向master发送心跳消息*/
case class HeartBeat(var workerId:String)
/**master 向自己发送消息 定时检测超时worker*/
object CheckWorkerTimeOut

2.4 Worker

import java.util.UUID

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._

class Worker(var cores: Int, var memory: Int) extends Actor {
  var masterActor: ActorSelection = null
  /** 用来收发消息的方法,这个方法会被多次执行,只要有消息过来就会被执行 */


  val workerId = UUID.randomUUID().toString
  //心跳时间间隔
  val heartBeatIntever = 3000

  /** * actor启动后会立即执行该方法 */
  override def preStart(): Unit = {
    /** 获取masterActorSystem的地址 */
    masterActor = context.actorSelection("akka.tcp://masterActorSystem@localhost:8081/user/masterActor")
    //启动后立即向master注册信息
    masterActor ! RegisterWorker(workerId, cores, memory)
  }

  override def receive: Receive = {
    /** 接收到启动消息 */
    case "started" => println("worker start up")

    /** 接收到master发送过来的worker注册成功的消息 */
    case RegisteredWorker => {
      //导入定时器
      import context.dispatcher
      //开启定时任务 parameters=>什么时候开始执行任务,执行间隔,给谁定的任务,做怎么任务
      context.system.scheduler.schedule(0 millis, heartBeatIntever millis, self, SendHeartBeat)
    }
    /** 向master汇报心跳 */
    case SendHeartBeat => {
      masterActor!HeartBeat(workerId)
    }
  }

}
/**
  * 用来封装worker的基本信息
  * @param workerId
  * @param cores
  * @param memory
  */
class WorkerInfo(var workerId:String,var cores:Int,var memory:Int) extends Serializable {
  /**worker 最近一次汇报心跳的时间*/
  var lastHeartBeatTime:Long = System.currentTimeMillis()
}

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Flink系列 1. 什么是Flink ? Flink系列 1. 什么是Flink ?
1. Flink 是什么?Apache Flink 是一个分布式大数据处理引擎,可对有界数据流和无界数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行快速计算。 Apache Flink 功能强大,支持开发和运
2020-04-25
下一篇 
【scala系列】13、Scala IO 、 XML 【scala系列】13、Scala IO 、 XML
1. IOimport java.io._, java.nio._ object IODemo{ def main(args: Array[String]): Unit = { val path = "D:\\data\\oo
2020-04-20
  目录