登 录
注 册
< 大 数 据
Flink
Hadoop
Spark
Hive
HBase
Kafka
其他框架
Spark简介
Spark开发环境初始化
Spark运行架构与核心组件
Spark RDD详解
Spark RDD转换算子-单Value类型
Spark RDD转换算子-其他类型
Spark RDD行动算子
Spark Shuffle的设计思想
Spark SQL Hints用法总结
热门推荐>>>
中台架构
中台建设与架构
Hadoop
源码分析-NN启动(三)
HBase
HBased对接Hive
Linux
Nginx高可用
Python
数据导出工具
Flink
3分钟搭建Flink SQL测试环境
Kafka
Kafka对接Flume
深度学习
卷积神经网络
数据结构与算法
选择合适的算法
MySQL
数据备份恢复
计算机系统
信号量同步线程
Hive
Hive调优参数大全
其他框架
Azkaban Flow1.0与2.0
ClickHouse
表引擎-其他类型
技术成长
最好的职业建议
精选书单
技术成长书单—机器学习
技术资讯
数据在线:计算将成为公共服务
开发工具
IntelliJ IDEA 20年发展回顾(二)
系统工具
Mac命令行工具
虚拟化
内存虚拟化概述
云原生
云原生构建现代化应用
云服务
一文搞懂公有云、私有云...
Java
Spring Boot依赖注入与Runners
Go
Go函数与方法
SQL
SQL模板
当前位置:
首页
>>
Spark
>>
Spark RDD行动算子
Spark RDD行动算子
2023-09-10 22:16:06 星期日 发表于北京 阅读:561
![](/static/images/article_images/1694355151.632511.jpeg) 所谓行动算子,其实就是触发作业执行的方法,在Spark中,常用的行动算子如下 #### reduce算子 聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据 示例:计算数组所有元素的和 ``` package cn.libins.learn.spark.core.rdd import org.apache.spark.{SparkConf, SparkContext} object OperatorRDD { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List(1, 2, 3, 4)) // reduce算子计算执行直接出结果 val reduceResult = rdd.reduce((x, y) => x + y) println(reduceResult) sc.stop() } } ``` 输出结果 10 #### collect算子 在驱动程序中,以数组 Array 的形式返回数据集的所有元素。即:将一个RDD不同分区的数据按照分区顺序采集到Driver端内存中,形成数组。 ``` val rdd = sc.makeRDD(List(1, 2, 3, 4)) val collectResult = rdd.collect() println(collectResult.mkString("|")) ``` 输出结果 1|2|3|4 #### count算子 返回 RDD 中元素的个数 ``` val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6)) val countResult = rdd.count() println(countResult) ``` 输出结果 6 #### first算子 返回 RDD 中的第一个元素 ``` val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6)) val firstResult = rdd.first() println(firstResult) ``` 输出结果 1 #### take算子 返回一个由 RDD 的前 n 个元素组成的数组 ``` val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6)) val takeResult = rdd.take(3) println(takeResult.mkString("|")) ``` 输出结果 1|2|3 #### takeOrdered算子 返回该 RDD 排序后的前 n 个元素组成的数组 ``` val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6)) val takeOrderResult = rdd.takeOrdered(3)(Ordering.Int.reverse) println(takeOrderResult.mkString("|")) ``` 输出结果 6|5|4 #### aggregate算子 分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合 ``` val rdd = sc.makeRDD(List(1, 2, 3, 4), 2) val result = rdd.aggregate(0)( (a, b) => a + b, // 分区内相加 (x, y) => x + y // 分区间相加 ) println(result) ``` 输出结果 10 #### countByKey、countByValue算子 这两个算子分别表示:统计给定的RDD中,每个key出现的次数和每个value出现的次数。 示例 ``` val rdd1 = sc.makeRDD(List(1, 1, 1, 3, 3, 5)) val rdd2 = sc.makeRDD(List( ("a", 1), ("b", 2), ("a", 2), ("b", 4), ("a", 5) )) val valueCnt = rdd1.countByValue() val keyCnt = rdd2.countByKey() println("valueCnt " + valueCnt) println("keyCnt " + keyCnt) ``` 输出结果 ![](/static/images/article_images/1694355356.372973.png) 在rdd1中:1出现3次,3出现2次,5出现1次 在rdd2中:a出现3次,b出现2次 #### save相关算子 将数据保存到不同格式的文件中 ``` // 保存成 Text 文件 rdd.saveAsTextFile("output") // 序列化成对象保存到文件 rdd.saveAsObjectFile("output1") // 保存成 Sequencefile 文件,要求rdd必须是key/value类型 rdd.map((_,1)).saveAsSequenceFile("output2") ``` #### foreach算子 分布式遍历 RDD 中的每一个元素,调用指定函数。 示例:演示两种foreach的场景 ``` val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6)) // 其实是Driver端内存集合的遍历方法(需要将各个分区的数据拉回Driver再打印) // 从而保证打印的数据是按照分区顺序打印的 rdd.collect().foreach(println) println("********************") //Executor端内存数据打印(打印的数据无序) ``` rdd.foreach(println) 输出结果 ![](/static/images/article_images/1694355308.774977.png) 可以看到下半部分因为是在分布式节点上执行的,所以打印顺序无法保证