Spark中RDD的map/foreach/mapPartition/foreachPartition操作
在Spark中对RDD进行循环操作是一种非常常见的操作,正常情况下循环包含两种方法,一种是map()操作,一种是foreach()操作。这两者都是对RDD数据进行循环,但是前者属于transformation操作,后者是action操作。我们知道,在Spark中,所有的 transformations 都是懒加载的,不会马上计算它们的结果,而是仅仅记录转换操作是应用到哪些基础数据集上的,只有当 actions 要返回结果的时候计算才会发生。因此,RDD的map操作不会立即生效。RDD的transformation操作会产生一组新的RDD数据,但是action是在原有数据上进行操作,并不返回新数据。因此,通常foreach是操作一些不需要返回值的方法,或者返回值为空的方法,如向外部系统中传递数据等。而map操作则可以返回一个新的RDD,因此适合你需要对数据进行处理并计算得到新的结果。
我们以几个例子来说明:
//首先我们创建一个向量
val a = sc.parallelize(0 to 10)
//用map方式循环a,对其中每个元素+1
val b = a.map(_+1)
//用foreach方式循环a,对其中每个元素+1
val c = a.foreach(_+1)
//用foreach方式循环打印b
b.foreach(println)
//用foreach方式循环打印c
c.foreach(println)
//用map方式循环打印b
b.map(println)
//用map方式循环打印c
c.map(println)
执行上述语句,我们应该会依次得到如下结果:
//这是创建a成功的输出,表明a是一个Spark的RDD数据
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
//用map方式循环a,对其中每个元素+1,结果给b。这里只显示了map的结果是一个RDD
b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:25
//用foreach方式循环a,对其中每个元素+1,结果给c。这里现实了foreach的结果是Unit,因为foreach并不返回新的RDD,实质返回的是空
c:Unit
//用foreach方式循环打印b,由于b是一个RDD,因此这里有结果
3
4
1
6
2
5
9
7
10
8
11
//用foreach方式循环打印c,由于c是一个Unit,因此这里报错
<console>:26:error:value foreach is not a member of Unit
//用map方式循环打印b,由于map是一个transformation操作,所以这里不会打印出结果,只是记录下操作逻辑
res14:org.apache.spark.rdd.RDD[Unit] = MapPartitionsRDD[8] at map at <console>:26
//用map方式循环打印c,同上原理,出错
<console>:26:error:value foreach is not a member of Unit
对于map和foreach操作都有对应的mapPartitions和foreachPartition操作,它们是基于分区上的map和foreach操作。可以这么理解,map和foreach针对每一行数据进行循环,而mapPartitions和foreachPartition是针对分区的循环,因此,后者循环中每个元素是一个分区下的iterator,返回的也是iterator。看如下代码:
//创建一个向量,存储在两个分区中
val a = sc.parallelize(0 to 10, 2)
//使用mapPartitions循环,注意mapPartitions也是懒加载的,同时,里面用了map循环输出,但是这里并不会打印出真实的数据
a.mapPartitions(points => {
points.map(point =>
println(point)
)
})
//即便里面用了foreach循环输出,但是这里依然不会打印出真实的数据
a.mapPartitions(points => {
points.foreach(point =>
println(point)
)
points
})
//外层采用foreachPartition循环,里面用了map循环输出,这里依然不会打印出真实的数据
a.foreachPartition(points => {
points.map(point =>
println(point)
)
})
//外层采用foreachPartition循环,里面用了foreach循环输出,这里会打印出真实的数据!!!
a.foreachPartition(points => {
points.foreach(point =>
println(point)
)
})
注意,除了第二个例子中内层循环外有points,其他都没有,这里是因为mapPartitions需要返回一个iterator,第一个内部循环了map,因此已经有了这个作为返回,所以不会出错。第二个内部循环用的是foreach,返回的是Unit,不符合要求,因此加了points作为返回。而foreachPartition返回值也是Unit,因此,可以在任何场景下都不需要points
欢迎大家关注DataLearner官方微信,接受最新的AI技术推送
