1. 共享变量
通常,当传递给Spark操作(如map或reduce)的函数在远程集群节点上执行时,它可以在函数中使用的所有变量的单独副本上工作。这些变量被复制到每个机器,并且远程机器上的变量的更新不会传播回驱动程序。在任务之间支持一般的,读写共享的变量是低效的。然而,Spark 为两种常用的使用模式提供了两种有限类型的共享变量:广播变量和累加器。
2. 广播变量 broadcast
广播变量允许程序员在每台机器上保留只读变量,而不是使用任务运送它的副本。例如,可以使用它们以有效的方式为每个节点提供大型输入数据集的副本。Spark还尝试使用高效的广播算法分发广播变量,以降低通信成本。
Spark动作通过一组分阶段执行,由分散的“随机播放”操作隔开。Spark自动广播每个阶段任务所需的通用数据。以这种方式广播的数据以序列化形式进行缓存,并在运行每个任务之前进行反序列化。这意味着,显式创建广播变量仅在跨多个阶段的任务需要相同数据或者以反序列化格式缓存数据很重要时才有用。
广播变量通过调用从变量创建SparkContext.broadcast(v)。广播变量是一个包装器v,它的值可以通过调用该value 方法来访问。下面的代码显示如下
- BT形式的广播变量
- 使用场景 : lookup表, mapside join
- 注意点 : 只读,存于每台worker的cache,不随task发送
- 使用方式 : val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar.value
# 关闭默认广播
spark.sql.autoBroadcastJoinThreshold=-1
# 高效在集群传输数据,比task携带要高
scala> case class User(name:String){}
defined class User
scala> val bc = sc.broadcast(new User("tom"))
bc: org.apache.spark.broadcast.Broadcast[User] = Broadcast(2)
scala> val rdd = sc.makeRDD(1 to 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at :24
scala> rdd.map(e=>{println(bc.value.name);e}).collect
res3: Array[Int] = Array(1, 2, 3, 4, 5)
scala> bc.value.name
res4: String = tom
2. 累加器
• 只增
• 类似与MapReduce中的counter
• 用法 :
val accum = sc.accumulator(0)
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
accum.value
scala> val sum = sc.longAccumulator("sum")
sum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 360, name: Some(sum), value: 0)
scala> val rdd = sc.makeRDD(1 to 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at makeRDD at :24
scala> rdd.map(e=>{sum.add(e+3);e}).collect
res5: Array[Int] = Array(1, 2, 3, 4, 5)
scala> sum.value
res6: Long = 30
2.1 自定义累加器类
package com.sbz.accumulator
import scala.collection.mutable.Map
import org.apache.spark.util.AccumulatorV2
/**
* Created by admin on 2017/4/6.
* 需要注册,不然在运行过程中,会抛出一个序列化异常。
* val accumulator = new My2AccumulatorV2()
sc.register(accumulator)
*/
class MyAccumulatorV2 extends AccumulatorV2[String,String]{
var map = Map[String,Int]()
var result = ""
//当AccumulatorV2中存在类似数据不存在这种问题时,是否结束程序。
override def isZero: Boolean = {
true
}
//拷贝一个新的AccumulatorV2
override def copy(): AccumulatorV2[String, String] = {
val myAccumulator = new MyAccumulatorV2()
myAccumulator.result = this.result
myAccumulator
}
override def reset(): Unit = {
result = ""
}
override def add(word: String): Unit = {
if(map.getOrElse(word,0)==0){
map+=(word->1)
}else{
map.update(word,map.get(word).get+1)
}
result = map.toString()
result
}
override def merge(other: AccumulatorV2[String, String]) = other match {
case map: MyAccumulatorV2 =>
result = other.value
case _ =>
throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
override def value: String = {
result
}
}
// 测试类
package com.sbz.accumulator
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by admin on 2017/4/6.
* 自定义累加器测试类
*/
object TestMyAccumulator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("AccumulatorDeno")
val sc = new SparkContext(conf)
val accumulator = new MyAccumulatorV2()
sc.register(accumulator)
val rdd = sc.makeRDD(Array("tom","curry","green","curry","green"))
rdd.map(e=>{
println(e)
accumulator.add(e)
}).collect()
println("累加器的值为:"+accumulator.value)
}
}
2.2 Spark-Streaming+广播变量+累加器
package com.Streaming
import java.util
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{Accumulable, Accumulator, SparkContext, SparkConf}
import org.apache.spark.broadcast.Broadcast
object BroadcastAccumulatorStreaming {
/**
* 声明一个广播和累加器!
*/
private var broadcastList:Broadcast[List[String]] = _
private var accumulator:Accumulator[Int] = _
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local[4]").setAppName("broadcasttest")
val sc = new SparkContext(sparkConf)
/**
* duration是ms
*/
val ssc = new StreamingContext(sc,Duration(2000))
// broadcastList = ssc.sparkContext.broadcast(util.Arrays.asList("Hadoop","Spark"))
broadcastList = ssc.sparkContext.broadcast(List("Hadoop","Spark"))
accumulator= ssc.sparkContext.accumulator(0,"broadcasttest")
/**
* 获取数据!
*/
val lines = ssc.socketTextStream("localhost",9999)
/**
* 拿到数据后 怎么处理!
*
* 1.flatmap把行分割成词。
* 2.map把词变成tuple(word,1)
* 3.reducebykey累加value
* (4.sortBykey排名)
* 4.进行过滤。 value是否在累加器中。
* 5.打印显示。
*/
val words = lines.flatMap(line => line.split(" "))
val wordpair = words.map(word => (word,1))
wordpair.filter(record => {broadcastList.value.contains(record._1)})
val pair = wordpair.reduceByKey(_+_)
/**
*这步为什么要先foreachRDD?
*
* 因为这个pair 是PairDStream<String, Integer>
*
* 进行foreachRDD是为了?
*
*/
/* pair.foreachRDD(rdd => {
rdd.filter(record => {
if (broadcastList.value.contains(record._1)) {
accumulator.add(1)
return true
} else {
return false
}
})
})*/
val filtedpair = pair.filter(record => {
if (broadcastList.value.contains(record._1)) {
accumulator.add(record._2)
true
} else {
false
}
}).print
println("累加器的值"+accumulator.value)
// pair.filter(record => {broadcastList.value.contains(record._1)})
/* val keypair = pair.map(pair => (pair._2,pair._1))*/
/**
* 如果DStream自己没有某个算子操作。就通过转化transform!
*/
/* keypair.transform(rdd => {
rdd.sortByKey(false)//TODO
})*/
pair.print()
ssc.start()
ssc.awaitTermination()
}
}