Spark常用算子实践总结

Posted by Yezhiwei on May 3, 2018

mapPartitions 与 map 的区别

与 map 方法类似,map 是对 rdd 中的每一个元素进行操作,而 mapPartitions(foreachPartition) 则是对 rdd 中的每个分区的迭代器进行操作。如果在 map 过程中需要频繁创建额外的对象,例如将 rdd 中的数据通过 JDBC 连接写入数据库,map 需要为每个元素创建一个 Connection 链接,开销很大;而 mapPartition 为每个 partition 创建一个 Connection 链接。因此,mapPartitions 效率比 map 高的多。

SparkSql 或 DataFrame 默认会对程序进行 mapPartition 的优化。

var rdd1 = sc.makeRDD(List(1, 5, 3, 2, 4),2)
var rdd2 = rdd1.mapPartitions(it => {
val result = List[Int]()
var i = 0
while (it.hasNext) {
  val next = it.next()
  i += next
  println(next + "---")
}

result.::(i).iterator
})

rdd2.collect().foreach(println)

// 输出
18/04/24 18:08:17 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at mapPartitions at OperatorLearn.scala:126)
18/04/24 18:08:17 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
18/04/24 18:08:17 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0, PROCESS_LOCAL, 5292 bytes)
18/04/24 18:08:17 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
1---
5---
18/04/24 18:08:17 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 743 bytes result sent to driver
18/04/24 18:08:17 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1, PROCESS_LOCAL, 5296 bytes)
18/04/24 18:08:17 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
18/04/24 18:08:17 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 81 ms on localhost (1/2)
3---
2---
4---
18/04/24 18:08:17 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 743 bytes result sent to driver
18/04/24 18:08:17 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 19 ms on localhost (2/2)
18/04/24 18:08:17 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
18/04/24 18:08:17 INFO DAGScheduler: ResultStage 0 (collect at OperatorLearn.scala:138) finished in 0.112 s
18/04/24 18:08:17 INFO DAGScheduler: Job 0 finished: collect at OperatorLearn.scala:138, took 0.466785 s
6
9

foreachPartition和mapPartitions的分别

通过查看 foreachPartition 的源码发现返回值是空,可以看出 foreachPartition 应该属于 Action 运算操作,而 mapPartitions 是转化操作,此外在应用场景上区别是 mapPartitions 可以获取返回值,继续在返回 RDD 上做其他的操作,而 foreachPartition 因为没有返回值并且是 Action 操作,所以使用它一般都是在程序末尾,如:要落地数据到存储系统中如 MySQL,ES,或者 HBase 中,可以用它。

在 Transformation 中也可以落地数据,但是它必须依赖 Action 操作来触发它,因为 Transformation 操作是延迟执行的,如果没有任何 Action 方法来触发,那么 Transformation 操作是不会被执行的,这一点需要注意。

foreachPartition例子

val rdd=sc.parallelize(Seq(1,2,3,4,5),3)
    
rdd.foreachPartition(partiton=>{
  // partiton.size 不能执行这个方法,否则下面的foreach方法里面会没有数据,
  //因为iterator只能被执行一次
  partiton.foreach(line=>{
    //save(line)  落地数据
  })

})
    
sc.stop()

mapPartitions 例子

val rdd=sc.parallelize(Seq(1,2,3,4,5),3)

rdd.mapPartitions(partiton=>{
  //只能用map,不能用foreach,因为foreach没有返回值
  partiton.map(line=>{
    //save line
  }
  )
})

rdd.count()//需要action,来触发执行
sc.stop()

combineByKey函数详解

计算总成绩的例子

val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))
val d1 = sc.parallelize(initialScores, 2)
d1.combineByKey(score => {score}, (c1 : Double, newScore) => {c1 + newScore}, (c1: Double, c2 : Double) => {c1 + c2})
.map{case (name, score) => (name, score) }.collect().foreach(println)

sc.stop()

// 输出
(Wilma,286.0)
(Fred,274.0)
def combineByKey[C](  
      createCombiner: V => C,  
      mergeValue: (C, V) => C,  
      mergeCombiners: (C, C) => C,  
      partitioner: Partitioner,  
      mapSideCombine: Boolean = true,  
      serializer: Serializer = null) 

3个重要的函数参数说明:

createCombiner: V => C ,这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作)

mergeValue: (C, V) => C,该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)

mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)