登 录
注 册
< 大 数 据
Flink
Hadoop
Spark
Hive
HBase
Kafka
其他框架
Spark简介
Spark开发环境初始化
Spark运行架构与核心组件
Spark RDD详解
Spark RDD转换算子-单Value类型
Spark RDD转换算子-其他类型
Spark RDD行动算子
Spark Shuffle的设计思想
Spark SQL Hints用法总结
热门推荐>>>
中台架构
中台建设与架构
Hadoop
源码分析-NN启动(三)
HBase
HBased对接Hive
Linux
Nginx高可用
Python
数据导出工具
Flink
3分钟搭建Flink SQL测试环境
Kafka
Kafka对接Flume
深度学习
卷积神经网络
数据结构与算法
选择合适的算法
MySQL
数据备份恢复
计算机系统
信号量同步线程
Hive
Hive调优参数大全
其他框架
Azkaban Flow1.0与2.0
ClickHouse
表引擎-其他类型
技术成长
最好的职业建议
精选书单
技术成长书单—机器学习
技术资讯
数据在线:计算将成为公共服务
开发工具
IntelliJ IDEA 20年发展回顾(二)
系统工具
Mac命令行工具
虚拟化
内存虚拟化概述
云原生
云原生构建现代化应用
云服务
一文搞懂公有云、私有云...
Java
Spring Boot依赖注入与Runners
Go
Go函数与方法
SQL
SQL模板
当前位置:
首页
>>
Spark
>>
Spark RDD转换算子-其他类型
Spark RDD转换算子-其他类型
2023-09-10 22:01:02 星期日 发表于北京 阅读:580
![](/static/images/article_images/1694354302.514945.jpeg) RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行 本节介绍双Value类型及Key-Value类型算子。**所谓双value类型,就是该算子可接收两个数据源(RDD)** 注:单value类型算子可参考上一篇文章 #### intersection/union/subtract/zip算子 求两个集合的交集、并集、差集和拉链(zip)。 上述四个算子,除了zip以外需要两个集合(RDD)的数据类型一致 但是针对zip算子,要求两个RDD的分区数量一致、且每个rdd的每个分区中的元素个数一致 ``` package cn.libins.learn.spark.core.rdd import org.apache.spark.{SparkConf, SparkContext} object IntersectionRDD { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") val sc = new SparkContext(sparkConf) val rdd1 = sc.makeRDD(List(1, 2, 3, 4, 5)) val rdd2 = sc.makeRDD(List(3, 4, 6, 7, 8)) val rddInter = rdd1.intersection(rdd2) // 交集 val rddUnion = rdd1.union(rdd2) // 并集 // 差集:以rdd1为基准,求rdd1与rdd2的差集 val rddSub = rdd1.subtract(rdd2) // 将两个rdd相同位置的数据拉到一起 val rddZip = rdd1.zip(rdd2) println("rddInter " + rddInter.collect().mkString("-")) println("rddUnion " + rddUnion.collect().mkString("-")) println("rddSub " + rddSub.collect().mkString("-")) println("rddZip " + rddZip.collect().mkString("-")) sc.stop() } } ``` 输出结果如下 ![](/static/images/article_images/1694354591.388951.png) #### 转换算子——Key-Value类型 ##### partitionBy算子 将数据按照指定 Partitioner (自定义分区规则)重新进行分区。Spark 默认的分区器是 HashPartitioner 示例:将原始的三个分区里的数据重新分区 ``` // 正常情况下,6个元素分到3个分区里,应该12,34,56分别在三个分区 val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3) // 因为要根据key进行分区,而原始数据只有value,所以这里随便设置一个key val mapRdd = rdd.map((_, "libin")) // 通过分区器重分区(hash),打乱原有的分区 val partitionedRdd = mapRdd.partitionBy(new HashPartitioner(3)) partitionedRdd.saveAsTextFile("data/output/partition_by") sc.stop() ``` 查看输出文件的内容 ![](/static/images/article_images/1694354718.2394931.png) Spark自带的分区器除了上面演示的HashPartitioner外还有一个RangerPartitioner。后者一般在排序的时候使用。 ##### reduceByKey算子 可以将数据按照相同的 Key 对 Value 进行聚合。 示例:将所给的Key/value键值对按照key进行相加聚合 ``` val rdd = sc.makeRDD(Array(("a", 2), ("b", 3), ("a", 5), ("b", 5), ("c", 2))) val reduceRdd = rdd.reduceByKey((x: Int, y: Int) => { x + y }) reduceRdd.collect().foreach(println) 输出结果 (a,7) (b,8) (c,2) ``` ##### groupByKey算子 将数据源的数据根据 key 对 value 进行分组,形成对偶元组,元组中的第一个元素就是key,元组中的第二个元素就是相同key的value的集合。 示例:按照key将相同的value分到一个组立 ``` val rdd = sc.makeRDD(Array(("a", 2), ("b", 3), ("a", 5), ("b", 5), ("c", 2))) val groupRdd = rdd.groupByKey() groupRdd.collect().foreach(println) 输出结果 (a,CompactBuffer(2, 5)) (b,CompactBuffer(3, 5)) (c,CompactBuffer(2)) ``` ###### 跟groupBy算子的区别 groupByKey明确指定只能根据key来group by,且生成的组中没有key,即:(a,CompactBuffer(2, 5)) groupBy可以根据任何字段来group by,且生成的组中包含key值 ###### 跟reduceByKey算子的区别 从 shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey 可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的 数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较 高。 从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚 合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那 么还是只能使用 groupByKey ##### aggregateByKey算子 将数据根据不同的规则进行分区内计算和分区间计算。这两种计算逻辑可以不一样。比如一个RDD有10个分区,分别求这10个分区的最大值(分区内),然后再将这10个最大值求和(分区间)。 注意:reduceByKey不管是分区内还是分区间要求计算逻辑一致(要么都求最大值,要么都求和) ``` // 将RDD分为两个分区:a1,a2 a3,a4 // 求出两个分区的最大值2,4 // 将两个分区的最大值求和=6 val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)), 2) rdd.aggregateByKey(0)( // 0表示第一个元素比较的初始值 (x, y) => math.max(x, y), // 分区内求最大 (x, y) => x + y // 分区间求和 ).collect.foreach(println) 输出结果 (a,6) ``` ##### combineByKey算子 aggregateByKey算子存在一个问题就是:万一初始值设置为不合理,那么就对计算结果有影响。比如说求最大值的时候,如果你设置的初始值比所有值大,那么得到的最大值其实不是用户输入的最大值。 为了解决这个问题。spark引入了combineByKey算子 最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于 aggregate(),combineByKey()允许用户返回值的类型与输入不一致。 整体思路是:将用户输入的每个分区内的相同key的第一个值变换为该key的初始值。 示例:求每个key的平均值 ``` val rdd = sc.makeRDD(Array(("a", 2), ("b", 3), ("a", 5), ("b", 5), ("a", 2), ("b", 4)), 2) val combineRdd = rdd.combineByKey(v => (v, 1), // 将分区内的第一个元素视为初始值 (t: (Int, Int), v) => { // 求分区内相同key对应value的和,以及求该key的出现次数 (t._1 + v, t._2 + 1) }, (t1: (Int, Int), t2: (Int, Int)) => { // 求分区间相同key对应value的和,以及求该key的出现次数之和 (t1._1 + t2._1, t1._2 + t2._2) } ) // 此时combineRdd的为(a,(9,3)) (b,(12,3)) // 其中第一个元素为value的和,第二个元素为key的出现次数,需要将和除以次数得到该key的平均值 combineRdd.mapValues { case (num, cnt) => num / cnt }.collect().foreach(println) ``` 输出结果 (b,4) (a,3) ##### join/leftOuterJoin/rightOuterJoin算子 三个算子的功能类似SQL中的join:在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的(K,(V,W))的 RDD 要求两个RDD的key是相同的数据类型,如果相同的key在两个RDD中都有,那么会形成笛卡尔积,可能会导致内存OOM。所以,尽量少使用JOIN。 示例 ``` package cn.libins.learn.spark.core.rdd import org.apache.spark.{SparkConf, SparkContext} object JoinRDD { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") val sc = new SparkContext(sparkConf) val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("d", 4), ("e", 5))) val rdd2 = sc.makeRDD(List(("c", 1), ("a", 2), ("b", 3))) val joinRdd = rdd1.join(rdd2) val leftJoinRdd = rdd1.leftOuterJoin(rdd2) val rightJoinRdd = rdd1.rightOuterJoin(rdd2) println("joinRdd " + joinRdd.collect().mkString(" ")) println("leftJoinRdd " + leftJoinRdd.collect().mkString(" ")) println("rightJoinRdd " + rightJoinRdd.collect().mkString(" ")) sc.stop() } } ``` 输出结果中,没有关联到的就变成null ##### cogroup算子 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的 RDD ``` val dataRDD1 = sparkContext.makeRDD(List(("a",1),("a",2),("c",3))) val dataRDD2 = sparkContext.makeRDD(List(("a",1),("c",2),("c",3))) val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = dataRDD1.cogroup(dataRDD2) ```