Spark累加器与广播变量介绍


1. 共享变量

通常,当传递给Spark操作(如map或reduce)的函数在远程集群节点上执行时,它可以在函数中使用的所有变量的单独副本上工作。这些变量被复制到每个机器,并且远程机器上的变量的更新不会传播回驱动程序。在任务之间支持一般的,读写共享的变量是低效的。然而,Spark 为两种常用的使用模式提供了两种有限类型的共享变量:广播变量和累加器。

2. 广播变量 broadcast

广播变量允许程序员在每台机器上保留只读变量,而不是使用任务运送它的副本。例如,可以使用它们以有效的方式为每个节点提供大型输入数据集的副本。Spark还尝试使用高效的广播算法分发广播变量,以降低通信成本。

Spark动作通过一组分阶段执行,由分散的“随机播放”操作隔开。Spark自动广播每个阶段任务所需的通用数据。以这种方式广播的数据以序列化形式进行缓存,并在运行每个任务之前进行反序列化。这意味着,显式创建广播变量仅在跨多个阶段的任务需要相同数据或者以反序列化格式缓存数据很重要时才有用。

广播变量通过调用从变量创建SparkContext.broadcast(v)。广播变量是一个包装器 v,它的值可以通过调用该value 方法来访问。下面的代码显示如下

  • BT 形式的广播变量
  • 使用场景 : lookup表, mapside join
  • 注意点 : 只读,存于每台worker的cache,不随task发送
  • 使用方式 : val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar.value
# 关闭默认广播
spark.sql.autoBroadcastJoinThreshold=-1

# 高效在集群传输数据,比task携带要高
scala> case class User(name:String){}
defined class User

scala> val bc = sc.broadcast(new User("tom"))
bc: org.apache.spark.broadcast.Broadcast[User] = Broadcast(2)

scala> val rdd = sc.makeRDD(1 to 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at :24

scala> rdd.map(e=>{println(bc.value.name);e}).collect
res3: Array[Int] = Array(1, 2, 3, 4, 5)

scala> bc.value.name
res4: String = tom 

2. 累加器

• 只增

• 类似于 MapReduce 中的 counter

• 用法 :

  val accum = sc.accumulator(0) 
  sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) 
  accum.value

  scala> val sum = sc.longAccumulator("sum")
  sum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 360, name: Some(sum), value: 0)

  scala> val rdd = sc.makeRDD(1 to 5)

  rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at makeRDD at :24

  scala> rdd.map(e=>{sum.add(e+3);e}).collect

  res5: Array[Int] = Array(1, 2, 3, 4, 5)                     

  scala> sum.value
  res6: Long = 30

2.1 自定义累加器类

package com.sbz.accumulator

import scala.collection.mutable.Map
import org.apache.spark.util.AccumulatorV2

/**
  * Created by admin on 2017/4/6.
  * 需要注册,不然在运行过程中,会抛出一个序列化异常。
  * val accumulator = new My2AccumulatorV2()
    sc.register(accumulator)
  */
class MyAccumulatorV2 extends AccumulatorV2[String,String]{

  var map = Map[String,Int]()
  var result = ""
  //当AccumulatorV2中存在类似数据不存在这种问题时,是否结束程序。
  override def isZero: Boolean = {
    true
  }
  //拷贝一个新的AccumulatorV2
  override def copy(): AccumulatorV2[String, String] = {
    val myAccumulator = new MyAccumulatorV2()
    myAccumulator.result = this.result
    myAccumulator
  }

  override def reset(): Unit = {
    result = ""
  }

  override def add(word: String): Unit = {

    if(map.getOrElse(word,0)==0){
      map+=(word->1)
    }else{
      map.update(word,map.get(word).get+1)
    }
    result = map.toString()
    result
  }

  override def merge(other: AccumulatorV2[String, String]) = other match {
    case map: MyAccumulatorV2 =>
      result = other.value
    case _ =>
      throw new UnsupportedOperationException(
        s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
  }

  override def value: String = {
    result
  }
}

// 测试类

package com.sbz.accumulator

import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by admin on 2017/4/6.
  * 自定义累加器测试类
  */
object TestMyAccumulator {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("AccumulatorDeno")
    val sc = new SparkContext(conf)
    val accumulator = new MyAccumulatorV2()
    sc.register(accumulator)
    val rdd = sc.makeRDD(Array("tom","curry","green","curry","green"))
    rdd.map(e=>{
      println(e)
      accumulator.add(e)
    }).collect()
    println("累加器的值为:"+accumulator.value)
  }
}

3. Spark-Streaming+广播变量+累加器

package com.Streaming

import java.util

import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{Accumulable, Accumulator, SparkContext, SparkConf}
import org.apache.spark.broadcast.Broadcast

object BroadcastAccumulatorStreaming {

  /**
    * 声明一个广播和累加器!
    */
  private var broadcastList:Broadcast[List[String]]  = _
  private var accumulator:Accumulator[Int] = _

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local[4]").setAppName("broadcasttest")
    val sc = new SparkContext(sparkConf)
    /**
      * duration是ms
      */
    val ssc = new StreamingContext(sc,Duration(2000))
   // broadcastList = ssc.sparkContext.broadcast(util.Arrays.asList("Hadoop","Spark"))
    broadcastList = ssc.sparkContext.broadcast(List("Hadoop","Spark"))
    accumulator= ssc.sparkContext.accumulator(0,"broadcasttest")

    /**
      * 获取数据!
      */
    val lines = ssc.socketTextStream("localhost",9999)

    /**
      * 拿到数据后 怎么处理!
      *
      * 1.flatmap把行分割成词。
      * 2.map把词变成tuple(word,1)
      * 3.reducebykey累加value
      * (4.sortBykey排名)
      * 4.进行过滤。 value是否在累加器中。
      * 5.打印显示。
      */
    val words = lines.flatMap(line => line.split(" "))

    val wordpair = words.map(word => (word,1))

    wordpair.filter(record => {broadcastList.value.contains(record._1)})


    val pair = wordpair.reduceByKey(_+_)

    /**
      *这步为什么要先foreachRDD?
      *
      * 因为这个pair 是PairDStream<String, Integer>
      *
      *   进行foreachRDD是为了?
      *
      */
/*    pair.foreachRDD(rdd => {
      rdd.filter(record => {

        if (broadcastList.value.contains(record._1)) {
          accumulator.add(1)
          return true
        } else {
          return false
        }

      })

    })*/

    val filtedpair = pair.filter(record => {
        if (broadcastList.value.contains(record._1)) {
          accumulator.add(record._2)
          true
        } else {
          false
        }

     }).print

    println("累加器的值"+accumulator.value)

   // pair.filter(record => {broadcastList.value.contains(record._1)})

   /* val keypair = pair.map(pair => (pair._2,pair._1))*/

    /**
      * 如果DStream自己没有某个算子操作。就通过转化transform!
      */
   /* keypair.transform(rdd => {
      rdd.sortByKey(false)//TODO
    })*/
    pair.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
metadata.SessionHiveMetaStoreClient metadata.SessionHiveMetaStoreClient
1. 异常信息Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apac
2017-05-04
下一篇 
Phoenix05 索引介绍 Phoenix05 索引介绍
1. 介绍在HBase中,只有一个单一的按照字典序排序的 rowKey 索引,当使用 rowKey 来进行数据查询的时候速度较快,但是如果不使用rowKey 来查询的话就会使用 filter 来对全表进行扫描,很大程度上降低了检索性能。而
2017-03-25
  目录