当前位置: 首页 >> Spark >> Spark RDD行动算子

Spark RDD行动算子

2023-09-10 22:16:06 星期日  发表于北京  阅读:835


所谓行动算子,其实就是触发作业执行的方法,在Spark中,常用的行动算子如下

reduce算子

聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据
示例:计算数组所有元素的和

  1. package cn.libins.learn.spark.core.rdd
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object OperatorRDD {
  4. def main(args: Array[String]): Unit = {
  5. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
  6. val sc = new SparkContext(sparkConf)
  7. val rdd = sc.makeRDD(List(1, 2, 3, 4))
  8. // reduce算子计算执行直接出结果
  9. val reduceResult = rdd.reduce((x, y) => x + y)
  10. println(reduceResult)
  11. sc.stop()
  12. }
  13. }

输出结果
10

collect算子

在驱动程序中,以数组 Array 的形式返回数据集的所有元素。即:将一个RDD不同分区的数据按照分区顺序采集到Driver端内存中,形成数组。

  1. val rdd = sc.makeRDD(List(1, 2, 3, 4))
  2. val collectResult = rdd.collect()
  3. println(collectResult.mkString("|"))

输出结果
1|2|3|4

count算子

返回 RDD 中元素的个数

  1. val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6))
  2. val countResult = rdd.count()
  3. println(countResult)

输出结果
6

first算子

返回 RDD 中的第一个元素

  1. val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6))
  2. val firstResult = rdd.first()
  3. println(firstResult)

输出结果
1

take算子

返回一个由 RDD 的前 n 个元素组成的数组

  1. val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6))
  2. val takeResult = rdd.take(3)
  3. println(takeResult.mkString("|"))

输出结果
1|2|3

takeOrdered算子

返回该 RDD 排序后的前 n 个元素组成的数组

  1. val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6))
  2. val takeOrderResult = rdd.takeOrdered(3)(Ordering.Int.reverse)
  3. println(takeOrderResult.mkString("|"))

输出结果
6|5|4

aggregate算子

分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

  1. val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
  2. val result = rdd.aggregate(0)(
  3. (a, b) => a + b, // 分区内相加
  4. (x, y) => x + y // 分区间相加
  5. )
  6. println(result)

输出结果
10

countByKey、countByValue算子

这两个算子分别表示:统计给定的RDD中,每个key出现的次数和每个value出现的次数。
示例

  1. val rdd1 = sc.makeRDD(List(1, 1, 1, 3, 3, 5))
  2. val rdd2 = sc.makeRDD(List(
  3. ("a", 1), ("b", 2), ("a", 2), ("b", 4), ("a", 5)
  4. ))
  5. val valueCnt = rdd1.countByValue()
  6. val keyCnt = rdd2.countByKey()
  7. println("valueCnt
  8. " + valueCnt)
  9. println("keyCnt
  10. " + keyCnt)

输出结果

在rdd1中:1出现3次,3出现2次,5出现1次
在rdd2中:a出现3次,b出现2次

save相关算子

将数据保存到不同格式的文件中

  1. // 保存成 Text 文件
  2. rdd.saveAsTextFile("output")
  3. // 序列化成对象保存到文件
  4. rdd.saveAsObjectFile("output1")
  5. // 保存成 Sequencefile 文件,要求rdd必须是key/value类型
  6. rdd.map((_,1)).saveAsSequenceFile("output2")

foreach算子

分布式遍历 RDD 中的每一个元素,调用指定函数。
示例:演示两种foreach的场景

  1. val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6))
  2. // 其实是Driver端内存集合的遍历方法(需要将各个分区的数据拉回Driver再打印)
  3. // 从而保证打印的数据是按照分区顺序打印的
  4. rdd.collect().foreach(println)
  5. println("********************")
  6. //Executor端内存数据打印(打印的数据无序)

rdd.foreach(println)
输出结果

可以看到下半部分因为是在分布式节点上执行的,所以打印顺序无法保证