moocer 发表于 2020-6-15 17:55

【Sprak】【笔记】Sprak的RDD的Action算子笔记

本帖最后由 moocer 于 2020-6-15 20:24 编辑

Action算子
1 reduce(func)案例
1. 作用:通过func函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。
2. 需求:创建一个RDD,将所有元素聚合得到结果
(1)创建一个RDD
scala> val rdd1 = sc.makeRDD(1 to 10,2)rdd1: org.apache.spark.rdd.RDD = ParallelCollectionRDD at makeRDD at <console>:24
(2)聚合RDD所有元素
scala> rdd1.reduce(_+_)res50: Int = 55
(3)创建一个RDD
scala> val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5)))rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD at makeRDD at <console>:24
(4)聚合RDD所有数据scala> rdd2.reduce((x,y)=>(x._1 + y._1,x._2 + y._2))res1: (String, Int) = (aacd,12)2 collect()案例1. 作用:在驱动程序中,以数组的形式返回数据集的所有元素。2. 需求:创建一个RDD,并将RDD内容收集到Driver端打印(1)创建一个RDDscala> val rdd = sc.parallelize(1 to 10)rdd:org.apache.spark.rdd.RDD = ParallelCollectionRDD at parallelize at<console>:24(2)将结果收集到Driver端scala> rdd.collectres4: Array = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

3 count()案例
1. 作用:返回RDD中元素的个数
2. 需求:创建一个RDD,统计该RDD的条数
(1)创建一个RDD
scala> val rdd = sc.parallelize(1 to 10)rdd: org.apache.spark.rdd.RDD = ParallelCollectionRDD at parallelize at <console>:24
(2)统计该RDD的条数scala> rdd.count
res5: Long = 10

4 first()案例
1. 作用:返回RDD中的第一个元素
2. 需求:创建一个RDD,返回该RDD中的第一个元素
(1)创建一个RDD
scala> val rdd = sc.parallelize(1 to 10)rdd: org.apache.spark.rdd.RDD = ParallelCollectionRDD at parallelize at <console>:24
(2)统计该RDD的条数scala> rdd.first
res6: Int = 1

5 take(n)案例
1. 作用:返回一个由RDD的前n个元素组成的数组
2. 需求:创建一个RDD,统计该RDD的条数
(1)创建一个RDD
scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))rdd: org.apache.spark.rdd.RDD = ParallelCollectionRDD at parallelize at <console>:24
(2)统计该RDD的条数scala> rdd.take(3)
res7: Array = Array(2, 5, 4)

6 takeOrdered(n)案例
1. 作用:返回该RDD排序后的前n个元素组成的数组
2. 需求:创建一个RDD,统计该RDD的条数
(1)创建一个RDD
scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))rdd: org.apache.spark.rdd.RDD = ParallelCollectionRDD at parallelize at <console>:24
(2)统计该RDD的条数scala> rdd.takeOrdered(3)
res8: Array = Array(2, 3, 4)

7 aggregate案例
1. 参数:(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)
2. 作用:aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。
3. 需求:创建一个RDD,将所有元素相加得到结果
(1)创建一个RDD
scala> var rdd1 = sc.makeRDD(1 to 10,2)rdd1: org.apache.spark.rdd.RDD = ParallelCollectionRDD at makeRDD at <console>:24
(2)将该RDD所有元素相加得到结果scala> rdd1.aggregate(0)(_+_,_+_)
res13: Int = 55
补充:
var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD = ParallelCollectionRDD at makeRDD at <console>:24
修改初始值为1
scala> rdd1.aggregate(1)(_+_,_+_)
res23: Int = 58
计算的时候分区内计算加上初始值,分区合并也要计算一遍初始值,所以计算的时候就要加上分区数乘以初始值
8 fold(num)(func)案例
1. 作用:折叠操作,aggregate的简化操作,seqop和combop一样。
2. 需求:创建一个RDD,将所有元素相加得到结果
(1)创建一个RDD
scala> var rdd1 = sc.makeRDD(1 to 10,2)rdd1: org.apache.spark.rdd.RDD = ParallelCollectionRDD at makeRDD at <console>:24
(2)将该RDD所有元素相加得到结果scala> rdd.fold(0)(_+_)
res14: Int = 28

9 saveAsTextFile(path)
作用:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
10 saveAsSequenceFile(path)
作用:将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
11 saveAsObjectFile(path)
作用:用于将RDD中的元素序列化成对象,存储到文件中。
12 countByKey()案例
1. 作用:针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
2. 需求:创建一个PairRDD,统计每种key的个数
(1)创建一个PairRDD
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD at parallelize at <console>:24
(2)统计每种key的个数
scala> rdd.countByKeyres15: scala.collection.Map = Map(3 -> 2, 1 -> 3, 2 -> 1)


13 foreach(func)案例
1. 作用:在数据集的每一个元素上,运行函数func进行更新。
2. 需求:创建一个RDD,对每个元素进行打印
(1)创建一个RDD
scala> var rdd = sc.makeRDD(1 to 5,2)rdd: org.apache.spark.rdd.RDD = ParallelCollectionRDD at makeRDD at <console>:24
(2)对该RDD每个元素进行打印
scala> rdd.foreach(println(_))12345




页: [1]
查看完整版本: 【Sprak】【笔记】Sprak的RDD的Action算子笔记