所谓行动算子,其实就是触发作业执行的方法,在Spark中,常用的行动算子如下
聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据
示例:计算数组所有元素的和
package cn.libins.learn.spark.core.rdd
import org.apache.spark.{SparkConf, SparkContext}
object OperatorRDD {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4))
// reduce算子计算执行直接出结果
val reduceResult = rdd.reduce((x, y) => x + y)
println(reduceResult)
sc.stop()
}
}
输出结果
10
在驱动程序中,以数组 Array 的形式返回数据集的所有元素。即:将一个RDD不同分区的数据按照分区顺序采集到Driver端内存中,形成数组。
val rdd = sc.makeRDD(List(1, 2, 3, 4))
val collectResult = rdd.collect()
println(collectResult.mkString("|"))
输出结果
1|2|3|4
返回 RDD 中元素的个数
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6))
val countResult = rdd.count()
println(countResult)
输出结果
6
返回 RDD 中的第一个元素
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6))
val firstResult = rdd.first()
println(firstResult)
输出结果
1
返回一个由 RDD 的前 n 个元素组成的数组
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6))
val takeResult = rdd.take(3)
println(takeResult.mkString("|"))
输出结果
1|2|3
返回该 RDD 排序后的前 n 个元素组成的数组
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6))
val takeOrderResult = rdd.takeOrdered(3)(Ordering.Int.reverse)
println(takeOrderResult.mkString("|"))
输出结果
6|5|4
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
val result = rdd.aggregate(0)(
(a, b) => a + b, // 分区内相加
(x, y) => x + y // 分区间相加
)
println(result)
输出结果
10
这两个算子分别表示:统计给定的RDD中,每个key出现的次数和每个value出现的次数。
示例
val rdd1 = sc.makeRDD(List(1, 1, 1, 3, 3, 5))
val rdd2 = sc.makeRDD(List(
("a", 1), ("b", 2), ("a", 2), ("b", 4), ("a", 5)
))
val valueCnt = rdd1.countByValue()
val keyCnt = rdd2.countByKey()
println("valueCnt
" + valueCnt)
println("keyCnt
" + keyCnt)
输出结果
在rdd1中:1出现3次,3出现2次,5出现1次
在rdd2中:a出现3次,b出现2次
将数据保存到不同格式的文件中
// 保存成 Text 文件
rdd.saveAsTextFile("output")
// 序列化成对象保存到文件
rdd.saveAsObjectFile("output1")
// 保存成 Sequencefile 文件,要求rdd必须是key/value类型
rdd.map((_,1)).saveAsSequenceFile("output2")
分布式遍历 RDD 中的每一个元素,调用指定函数。
示例:演示两种foreach的场景
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6))
// 其实是Driver端内存集合的遍历方法(需要将各个分区的数据拉回Driver再打印)
// 从而保证打印的数据是按照分区顺序打印的
rdd.collect().foreach(println)
println("********************")
//Executor端内存数据打印(打印的数据无序)
rdd.foreach(println)
输出结果
可以看到下半部分因为是在分布式节点上执行的,所以打印顺序无法保证