Spark中parallelize函数和makeRDD函数的区别


我们知道,在Spark中RDD的创建方式大概可以分为三种:

  1. 从集合中创建RDD,
  2. 从外部存储中创建RDD,
  3. 从其他RDD创建

而从集合中创建RDD,Spark主要提供了两种函数:parallelize 和 makeRDD。我们可以先看看这两个函数的声明:

  • parallelize
def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T]
  • makeRDD
//第一种实现 与parallelize相同
def makeRDD[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T]

//第二种实现
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]

我们可以从上面看出makeRDD有两种实现,而第一个makeRDD函数接收的参数和parallelize完全相同。其实第一种makeRDD函数实现是依赖了parallelize函数来实现,来看看Spark中是怎么实现这个makeRDD函数的:

def makeRDD[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T] = withScope {
    //即 调用parallelize 函数
  parallelize(seq, numSlices)
}

我们可以看出,第一种makeRDD函数完全和parallelize函数一致。
接下来我们看看第二种makeRDD函数实现,他接收的参数类型是SequenceFile[T,Seq[String])],Spark文档的说明是:

Distribute a local Scala collection to form an RDD, with one or more location preferences (hostnames of Spark nodes) for each object. Create a new partition for each collection item.
分发本地Scala集合以形成RDD,每个对象具有一个或多个位置首选项(Spark节点的主机名)。 为每个集合项创建一个新分区。

原来这个函数还提供了位置信息,让我们来看看怎么使用

//使用parallelize 创建rdd
scala> val rdd1 = sc.parallelize(List(1,2,3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:21

//使用与与parallelize实现逻辑相同的makeRDD 方法创建rdd
scala> val rdd2 = sc.makeRDD(List(1,2,3))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at makeRDD at <console>:21

//创建构造rdd的数据
scala> val seq = List((1, List("spark.com", "spark1.com", "spark2.com")),
     | (2, List("spark.com", "spark2.com")))
seq: List[(Int, List[String])] = List((1,List(spark.com, spark1.com, spark2.com)),
 (2,List(spark.com, spark2.com)))

//使用加分区的makeRDD方法创建rdd  
scala> val rdd3 = sc.makeRDD(seq)
rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at makeRDD at <console>:23

//获取分区数据
scala> rdd3.preferredLocations(rdd3.partitions(1))
res26: Seq[String] = List(spark.com, spark2.com)

//获取分区数据 
scala> rdd3.preferredLocations(rdd3.partitions(0))
res27: Seq[String] = List(spark.com, spark1.com, spark2.com)

//获取分区数据
scala> rdd1.preferredLocations(rdd1.partitions(0))
res28: Seq[String] = List()

我们可以看到,makeRDD函数有两种实现,第一种其实完全和parallelize一致,而第二种实现可以为数据提供位置信息,而除此之外的实现和parallelize函数也是一致的,如下:

def parallelize[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T] = withScope {
  assertNotStopped()
  new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}

def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
  assertNotStopped()
  val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
  new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)
}

都是返回parallelCollectionRDD,而且这个makeRDD的实现不可以自己制定分区数量,而是固定为seq参数的size大小。


文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
JVM 系列(一)、 Java 类加载机制 JVM 系列(一)、 Java 类加载机制
1. JVM 概况介绍1.1 JVM 是什么?JVM 是 Java Virtual Machine( Java 虚拟机 )的缩写, JVM 是一种用于计算设备的规范,它是一个虚构出来的计算机,是通过在实际的计算机上仿真模拟
2019-12-20
下一篇 
SparkSQL读取Excel数据源——spark-excel SparkSQL读取Excel数据源——spark-excel
在Github上面看到一个针对SparkSQL加载Excel数据源的开源组件,拿过来测试了一下spark-excel 1.Spark Excel库用于使用Apache Spark查询Excel文件的库,用于Spark SQL和DataFra
2019-12-13
  目录