我们知道,在Spark中RDD的创建方式大概可以分为三种:
从集合中创建RDD,
从外部存储中创建RDD,
从其他RDD创建
而从集合中创建RDD,Spark主要提供了两种函数:parallelize 和 makeRDD。我们可以先看看这两个函数的声明:
1 2 3 def parallelize [T : ClassTag ]( seq: Seq [T ], numSlices: Int = defaultParallelism): RDD [T ]
1 2 3 4 5 6 7 def makeRDD [T : ClassTag ]( seq: Seq [T ], numSlices: Int = defaultParallelism): RDD [T ] def makeRDD [T : ClassTag ](seq: Seq [(T , Seq [String ])]): RDD [T ]
我们可以从上面看出makeRDD有两种实现,而第一个makeRDD函数接收的参数和parallelize完全相同。其实第一种makeRDD函数实现是依赖了parallelize函数来实现,来看看Spark中是怎么实现这个makeRDD函数的:
1 2 3 4 5 6 def makeRDD [T : ClassTag ]( seq: Seq [T ], numSlices: Int = defaultParallelism): RDD [T ] = withScope { parallelize(seq, numSlices) }
我们可以看出,第一种makeRDD函数完全和parallelize函数一致。 接下来我们看看第二种makeRDD函数实现,他接收的参数类型是SequenceFile[T,Seq[String])],Spark文档的说明是:
Distribute a local Scala collection to form an RDD, with one or more location preferences (hostnames of Spark nodes) for each object. Create a new partition for each collection item. 分发本地Scala集合以形成RDD,每个对象具有一个或多个位置首选项(Spark节点的主机名)。 为每个集合项创建一个新分区。
原来这个函数还提供了位置信息,让我们来看看怎么使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 scala> val rdd1 = sc.parallelize(List (1 ,2 ,3 )) rdd1: org.apache.spark.rdd.RDD [Int ] = ParallelCollectionRDD [10 ] at parallelize at <console>:21 scala> val rdd2 = sc.makeRDD(List (1 ,2 ,3 )) rdd2: org.apache.spark.rdd.RDD [Int ] = ParallelCollectionRDD [11 ] at makeRDD at <console>:21 scala> val seq = List ((1 , List ("spark.com" , "spark1.com" , "spark2.com" )), | (2 , List ("spark.com" , "spark2.com" ))) seq: List [(Int , List [String ])] = List ((1 ,List (spark.com, spark1.com, spark2.com)), (2 ,List (spark.com, spark2.com))) scala> val rdd3 = sc.makeRDD(seq) rdd3: org.apache.spark.rdd.RDD [Int ] = ParallelCollectionRDD [12 ] at makeRDD at <console>:23 scala> rdd3.preferredLocations(rdd3.partitions(1 )) res26: Seq [String ] = List (spark.com, spark2.com) scala> rdd3.preferredLocations(rdd3.partitions(0 )) res27: Seq [String ] = List (spark.com, spark1.com, spark2.com) scala> rdd1.preferredLocations(rdd1.partitions(0 )) res28: Seq [String ] = List ()
我们可以看到,makeRDD函数有两种实现,第一种其实完全和parallelize一致,而第二种实现可以为数据提供位置信息,而除此之外的实现和parallelize函数也是一致的,如下:
1 2 3 4 5 6 7 8 9 10 11 12 def parallelize [T : ClassTag ]( seq: Seq [T ], numSlices: Int = defaultParallelism): RDD [T ] = withScope { assertNotStopped() new ParallelCollectionRDD [T ](this , seq, numSlices, Map [Int , Seq [String ]]()) } def makeRDD [T : ClassTag ](seq: Seq [(T , Seq [String ])]): RDD [T ] = withScope { assertNotStopped() val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap new ParallelCollectionRDD [T ](this , seq.map(_._1), seq.size, indexToPrefs) }
都是返回parallelCollectionRDD,而且这个makeRDD的实现不可以自己制定分区数量,而是固定为seq参数的size大小。