使用方式 : val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar.value
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
# 关闭默认广播 spark.sql.autoBroadcastJoinThreshold=-1 # 高效在集群传输数据,比task携带要高 scala> case class User(name:String){} defined class User scala> val bc = sc.broadcast(new User("tom")) bc: org.apache.spark.broadcast.Broadcast[User] = Broadcast(2) scala> val rdd = sc.makeRDD(1 to 5) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at <console>:24 scala> rdd.map(e=>{println(bc.value.name);e}).collect res3: Array[Int] = Array(1, 2, 3, 4, 5) scala> bc.value.name res4: String = tom
2. 累加器
• 只增
• 类似于 MapReduce 中的 counter
• 用法 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
val accum = sc.accumulator(0) sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) accum.value scala> val sum = sc.longAccumulator("sum") sum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 360, name: Some(sum), value: 0) scala> val rdd = sc.makeRDD(1 to 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at makeRDD at <console>:24 scala> rdd.map(e=>{sum.add(e+3);e}).collect
/** * Created by admin on 2017/4/6. * 需要注册,不然在运行过程中,会抛出一个序列化异常。 * val accumulator = new My2AccumulatorV2() sc.register(accumulator) */ classMyAccumulatorV2extendsAccumulatorV2[String,String]{
var map = Map[String,Int]() var result = "" //当AccumulatorV2中存在类似数据不存在这种问题时,是否结束程序。 overridedefisZero: Boolean = { true } //拷贝一个新的AccumulatorV2 overridedefcopy(): AccumulatorV2[String, String] = { val myAccumulator = newMyAccumulatorV2() myAccumulator.result = this.result myAccumulator }
overridedefreset(): Unit = { result = "" }
overridedefadd(word: String): Unit = {
if(map.getOrElse(word,0)==0){ map+=(word->1) }else{ map.update(word,map.get(word).get+1) } result = map.toString() result }
overridedefmerge(other: AccumulatorV2[String, String]) = other match { case map: MyAccumulatorV2 => result = other.value case _ => thrownewUnsupportedOperationException( s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") }
overridedefvalue: String = { result } }
// 测试类
package com.sbz.accumulator
import org.apache.spark.{SparkConf, SparkContext}
/** * Created by admin on 2017/4/6. * 自定义累加器测试类 */ objectTestMyAccumulator{ defmain(args: Array[String]): Unit = { val conf = newSparkConf().setMaster("local").setAppName("AccumulatorDeno") val sc = newSparkContext(conf) val accumulator = newMyAccumulatorV2() sc.register(accumulator) val rdd = sc.makeRDD(Array("tom","curry","green","curry","green")) rdd.map(e=>{ println(e) accumulator.add(e) }).collect() println("累加器的值为:"+accumulator.value) } }