Spark RDD基础编程详解(一):创建与转换操作
一、RDD简介1.1 什么是RDDRDDResilient Distributed Dataset弹性分布式数据集是Spark中最基本的数据抽象。通俗来讲RDD是一种用于表示分布式计算中数据集合的不可变数据结构。RDD的核心特征特征说明弹性Resilient具备容错能力节点故障时可自动恢复数据和计算分布式Distributed数据分散存储在集群多个节点上支持并行处理数据集Dataset由一系列记录组成可看作不可变的、可分区的数据集合惰性计算转换操作仅记录轨迹遇到行动操作才触发真正计算可缓存可缓存在内存中在多次计算中重用1.2 RDD的五大特性一组分区A list of partitionsRDD由多个分区组成每个分区是数据集的一个子集计算函数A function for computing each split每个分区都有对应的计算函数依赖关系A list of dependencies on other RDDsRDD之间通过依赖关系形成血统Lineage分区器Optionally, a Partitioner for key-value RDDs对于KV型RDD可指定分区方式优先位置Optionally, a list of preferred locations数据本地性优先在数据所在节点计算二、RDD的创建Spark主要通过SparkContext的textFile()和parallelize()方法创建RDD。2.1 环境准备importorg.apache.spark.{SparkConf,SparkContext}// 创建SparkConf对象配置应用名称和运行模式valconfnewSparkConf().setAppName(RDD-Demo).setMaster(local)// local表示本地单线程执行local[*]表示本地多线程// 创建SparkContext对象这是Spark程序的入口valscnewSparkContext(conf)注意Spark 2.0推荐使用SparkSession作为统一入口但在RDD编程中仍需要SparkContext。2.2 从文件系统加载数据创建RDD2.2.1 从本地文件系统加载objectCreateRddByFileScala{defmain(args:Array[String]):Unit{valconfnewSparkConf().setAppName(CreateRddByFileScala).setMaster(local)valscnewSparkContext(conf)// Windows系统路径valpathD:\test\data// Linux/Mac系统路径// val path file:///usr/local/test/data/// textFile(path, minPartitions) 第二个参数指定最小分区数valrddsc.textFile(path,2)// 统计文件内数据的总长度vallengthrdd.map(_.length).reduce(__)println(s文件总字符数:$length)sc.stop()}}textFile()方法详解参数类型说明pathString文件路径支持本地文件、HDFS、S3等minPartitionsInt最小分区数可选默认根据文件大小和块大小计算2.2.2 从HDFS加载数据// 只需修改路径为HDFS路径valpathhdfs://hadoop101:9000/test/valrddsc.textFile(path,2)2.2.3 从多个文件加载// 加载目录下所有文件valrddsc.textFile(hdfs://hadoop101:9000/logs/)// 加载匹配通配符的文件valrddsc.textFile(hdfs://hadoop101:9000/logs/2024-*.log)// 加载整个目录包括子目录valrddsc.wholeTextFiles(hdfs://hadoop101:9000/logs/)2.3 通过并行集合数组创建RDD调用SparkContext.parallelize()方法将已存在的集合转换为RDD。objectCreateRddByArrayScala{defmain(args:Array[String]):Unit{valconfnewSparkConf().setAppName(CreateRddByArrayScala).setMaster(local)valscnewSparkContext(conf)// 创建Scala集合valarrArray(1,2,3,4,5)// 基于集合创建RDD可指定分区数valrddsc.parallelize(arr,2)// 查看RDD的分区数println(s分区数:${rdd.getNumPartitions})// 查看各分区数据rdd.glom().collect().foreach(partitionprintln(partition.mkString(,)))sc.stop()}}parallelize()方法详解参数类型说明seqSeq[T]要并行化的集合numSlicesInt分区数可选默认根据集群配置计算其他创建方式// 从List创建vallistRddsc.parallelize(List(1,2,3,4,5))// 从Range创建valrangeRddsc.parallelize(1to100)// 从Seq创建valseqRddsc.parallelize(Seq(a,b,c))// makeRDD是parallelize的别名更语义化valrddsc.makeRDD(Array(1,2,3,4,5),2)2.4 创建方式对比创建方式适用场景特点textFile()读取文件数据支持本地、HDFS、S3等多种文件系统parallelize()测试数据、小数据集将内存集合转换为分布式RDDwholeTextFiles()读取小文件返回(K,V)对K是文件名V是文件内容sequenceFile()读取SequenceFileHadoop生态的二进制键值对文件三、RDD操作概述RDD的操作分为两大类操作类型特点返回值示例转换操作Transformation惰性执行记录转换轨迹新的RDDmap、filter、flatMap行动操作Action触发真正计算值或结果count、collect、reduce惰性机制图解RDD1 → map() → RDD2 → filter() → RDD3 → reduce() → 结果 ↓ ↓ ↓ 记录轨迹 记录轨迹 记录轨迹 不计算 不计算 触发计算四、核心转换操作详解4.1 filter() — 过滤操作功能对RDD中的每个元素应用过滤函数保留返回true的元素。签名def filter(f: T Boolean): RDD[T]示例筛选包含Spark的行objectFilterDemo{defmain(args:Array[String]):Unit{valconfnewSparkConf().setAppName(filter-test).setMaster(local)valscnewSparkContext(conf)// 加载数据valrdd:RDD[String]sc.textFile(data/word.txt)// filter参数是一个函数返回Boolean类型// true - 保留该元素false - 过滤掉vallineWithSpark:RDD[String]rdd.filter(line{line.contains(Spark)})// 输出结果lineWithSpark.foreach(println)sc.stop()}}输入数据word.txtHadoop is good Spark is better Spark is fast运行结果Spark is better Spark is fast简写形式// 当函数体只有一行时可省略花括号valresultrdd.filter(_.contains(Spark))4.2 map() — 映射操作功能对RDD中的每个元素应用映射函数返回一个新的RDD。签名def map[U: ClassTag](f: T U): RDD[U]示例1数值翻倍valarrArray(1,2,3,4,5)valrdd1sc.parallelize(arr)// 每个元素乘以2valrdd2rdd1.map(numnum*2)rdd2.foreach(println)// 输出: 2, 4, 6, 8, 10示例2字符串拆分valrdd1sc.textFile(data/word.txt)// 将每行按空格拆分为数组valrdd2rdd1.map(lineline.split( ))rdd2.foreach(arrprintln(arr.mkString(,)))数据转换过程输入RDD: Hadoop is good Spark is better Spark is fast map(line line.split( ))后: Array(Hadoop, is, good) Array(Spark, is, better) Array(Spark, is, fast)注意map()操作后RDD的元素类型从String变成了Array[String]这是一个二维结构。4.3 flatMap() — 扁平化映射功能先对RDD中的每个元素执行map()操作再对结果执行flatten()扁平化操作将多维结构展平为一维。签名def flatMap[U: ClassTag](f: T TraversableOnce[U]): RDD[U]示例单词拆分扁平化valrdd1sc.textFile(data/word.txt)// flatMap map flattenvalrdd2rdd1.flatMap(lineline.split( ))rdd2.foreach(println)数据转换过程输入RDD: Hadoop is good Spark is better Spark is fast Step 1 - map(line line.split( )): Array(Hadoop, is, good) Array(Spark, is, better) Array(Spark, is, fast) Step 2 - flatten: Hadoop is good Spark is better Spark is fastmap() vs flatMap() 对比操作输入输出维度变化mapRDD[String]RDD[Array[String]]1维 → 2维flatMapRDD[String]RDD[String]1维 → 1维展平4.4 groupByKey() — 按键分组功能将RDD中相同Key的元素分组返回(Key, Iterable[Value])形式的RDD。签名def groupByKey(): RDD[(K, Iterable[V])]示例单词分组valrdd1sc.textFile(data/word.txt)valrdd2rdd1.flatMap(_.split( ))// 将单词映射为 (word, 1) 的键值对valrdd3rdd2.map(word(word,1))// RDD((Hadoop,1), (is,1), (good,1), (Spark,1), (is,1), ...)// 按Key分组相同Key的Value放入Iterablevalrdd4rdd3.groupByKey()// RDD((Hadoop, Iterable(1)), (is, Iterable(1,1,1)), ...)// 统计每个单词出现的次数valrdd5rdd4.map(t(t._1,t._2.size))// RDD((Hadoop,1), (is,3), (good,1), (Spark,2), ...)rdd5.foreach(println)运行结果(Spark,2) (is,3) (fast,1) (good,1) (better,1) (Hadoop,1)groupByKey()的数据流转rdd3: RDD[(String, Int)] (Hadoop, 1) (is, 1) (good, 1) (Spark, 1) (is, 1) (better, 1) (Spark, 1) (is, 1) (fast, 1) ↓ groupByKey() rdd4: RDD[(String, Iterable[Int])] (Hadoop, Iterable(1)) (is, Iterable(1, 1, 1)) (good, Iterable(1)) (Spark, Iterable(1, 1)) (better, Iterable(1)) (fast, Iterable(1))4.5 reduceByKey() — 按键聚合功能将RDD中相同Key的Value进行聚合计算返回(Key, AggregatedValue)形式的RDD。签名def reduceByKey(func: (V, V) V): RDD[(K, V)]示例WordCount优化版valrdd1sc.textFile(data/word.txt)valrdd2rdd1.flatMap(_.split( ))valrdd3rdd2.map(word(word,1))// reduceByKey在Map端先进行预聚合再Shuffle效率高于groupByKeyvalrdd4rdd3.reduceByKey(__)// 等价于: rdd3.reduceByKey((v1, v2) v1 v2)rdd4.foreach(println)运行结果(Spark,2) (is,3) (fast,1) (good,1) (better,1) (Hadoop,1)reduceByKey() vs groupByKey() 对比特性reduceByKeygroupByKey返回类型RDD[(K, V)]RDD[(K, Iterable[V])]Map端预聚合✅ 有❌ 无Shuffle数据量少已预聚合多全量传输内存占用低高需存储Iterable适用场景聚合计算求和、计数等需要保留所有原始值reduceByKey工作原理Map端预聚合: 分区1: (is,1), (is,1) → (is,2) 分区2: (is,1), (Spark,1), (Spark,1) → (is,1), (Spark,2) Shuffle后Reduce端: (is,2) (is,1) → (is,3) (Spark,2) → (Spark,2)五、完整实战WordCount综合以上转换操作实现经典的WordCount程序importorg.apache.spark.{SparkConf,SparkContext}objectWordCount{defmain(args:Array[String]):Unit{valconfnewSparkConf().setAppName(WordCount).setMaster(local)valscnewSparkContext(conf)// 1. 读取文件创建RDDvallinessc.textFile(data/word.txt)// 2. flatMap拆分单词valwordslines.flatMap(_.split( ))// 3. map转换为 (word, 1) 键值对valwordPairswords.map((_,1))// 4. reduceByKey聚合统计valwordCountswordPairs.reduceByKey(__)// 5. 按词频降序排序行动操作触发计算valsortedwordCounts.sortBy(_._2,ascendingfalse)// 6. collect收集结果到Driver并打印sorted.collect().foreach(println)sc.stop()}}输出结果(is,3) (Spark,2) (Hadoop,1) (good,1) (better,1) (fast,1)六、转换操作速查表转换操作功能输入 → 输出是否触发Shufflemap(f)一对一映射RDD[T]→RDD[U]❌filter(f)条件过滤RDD[T]→RDD[T]❌flatMap(f)映射后扁平化RDD[T]→RDD[U]❌mapPartitions(f)按分区映射RDD[T]→RDD[U]❌sample(fraction)随机采样RDD[T]→RDD[T]❌union(other)合并两个RDDRDD[T]RDD[T]→RDD[T]❌distinct()去重RDD[T]→RDD[T]✅groupByKey()按键分组RDD[(K,V)]→RDD[(K, Iterable[V])]✅reduceByKey(f)按键聚合RDD[(K,V)]→RDD[(K,V)]✅sortByKey()按键排序RDD[(K,V)]→RDD[(K,V)]✅join(other)内连接RDD[(K,V)]RDD[(K,W)]→RDD[(K,(V,W))]✅coalesce(n)减少分区RDD[T]→RDD[T]❌窄依赖repartition(n)重新分区RDD[T]→RDD[T]✅七、总结本文系统讲解了RDD的创建方式和核心转换操作RDD创建通过textFile()从文件系统加载或通过parallelize()从集合创建惰性机制转换操作记录轨迹不计算行动操作触发真正执行核心算子filter()条件过滤保留满足条件的元素map()一对一映射转换元素类型或值flatMap()映射扁平化将多维结构展平groupByKey()按键分组返回(Key, Iterable[Value])reduceByKey()按键聚合Map端预聚合减少Shuffle最佳实践聚合场景优先使用reduceByKey替代groupByKey