吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 1264|回复: 0
收起左侧

[其他转载] 【Sprak】【笔记】Sprak的RDD的转换之Value类型笔记

[复制链接]
moocer 发表于 2020-6-12 10:34
本帖最后由 moocer 于 2020-6-12 11:47 编辑

Value类型
1 map(func)案例
1. 作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
2. 需求:创建一个1-10数组的RDD,将所有元素*2形成新的RDD
1)创建
[Scala] 纯文本查看 复制代码
scala> var source  = sc.parallelize(1 to 10)

[Scala] 纯文本查看 复制代码
source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

2)打印
[Scala] 纯文本查看 复制代码
scala> source.collect()

[Scala] 纯文本查看 复制代码
res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

3)将所有元素*2
[Scala] 纯文本查看 复制代码
scala> val mapadd = source.map(_ * 2)

[Scala] 纯文本查看 复制代码
mapadd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at <console>:26

4)打印最终结果
[Scala] 纯文本查看 复制代码
scala> mapadd.collect()

[Scala] 纯文本查看 复制代码
res8: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

2 mapPartitions(func) 案例
1. 作用:类似于map,但独立地在RDD的每一个分片上运行,因此在类型为TRDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N,mapPartitions被调用M,一个函数一次处理所有分区。
2. 需求:创建一个RDD,使每个元素*2组成新的RDD
1)创建一个RDD
[Scala] 纯文本查看 复制代码
scala> val rdd = sc.parallelize(Array(1,2,3,4))

[Scala] 纯文本查看 复制代码
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

2)使每个元素*2组成新的RDD
[Scala] 纯文本查看 复制代码
scala> rpPartitions(x=>x.map(_*2))res3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at <console>:27

3)打印新的RDD
[Scala] 纯文本查看 复制代码
scala> res3.collectres4: Array[Int] = Array(2, 4, 6, 8)

3 mapPartitionsWithIndex(func) 案例
1. 作用:类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为TRDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
2. 需求:创建一个RDD,使每个元素跟所在分区形成一个元组组成一个新的RDD
1)创建一个RDD
[Scala] 纯文本查看 复制代码
scala> val rdd = sc.parallelize(Array(1,2,3,4))

[Scala] 纯文本查看 复制代码
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

2)使每个元素跟所在分区形成一个元组组成一个新的RDD
[Scala] 纯文本查看 复制代码
scala> val indexRdd = rpPartitionsWithIndex((index,items)=>(items.map((index,_))))

[Scala] 纯文本查看 复制代码
indexRdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at mapPartitionsWithIndex at <console>:26

3)打印新的RDD
[Scala] 纯文本查看 复制代码
scala> indexRdd.collectres2: Array[(Int, Int)] = Array((0,1), (0,2), (1,3), (1,4))

4 flatMap(func) 案例
1. 作用:类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
2. 需求:创建一个数组中包含list的RDD,运用flatMap创建一个新的RDD,新的RDD取出每一个数组中的集合的各个元素
1)创建
[Scala] 纯文本查看 复制代码
scala> var sourceFlat  = sc.parallelize(Array(List(1,2),List(3,4)))

[Scala] 纯文本查看 复制代码
sourceFlat: org.apache.spark.rdd.RDD[List[Int]] = ParallelCollectionRDD[10] at parallelize at <console>:14

2)打印
[Scala] 纯文本查看 复制代码
scala> sourceFlat.collect()

[Scala] 纯文本查看 复制代码
res14: Array[List[Int]] = Array(List(1, 2), List(3, 4))

3)根据原RDD创建新RDD()
[Scala] 纯文本查看 复制代码
scala> val flatMapResult = sourceFlat.flatMap(datas=>datas)

[Scala] 纯文本查看 复制代码
flatMapResult: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[11] at flatMap at <console>:15

4)打印新RDD
[Scala] 纯文本查看 复制代码
scala> flatMapResult.collect

[Scala] 纯文本查看 复制代码
res16: Array[Int] = Array(1, 2, 3, 4)

5 map()mapPartition()的区别
1. map():每次处理一条数据。
2. mapPartition():每次处理一个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才能释放,可能导致OOM
3. 开发指导:当内存空间较大的时候建议使用mapPartition(),以提高处理效率。

6 glom案例
1. 作用:将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
2. 需求:创建一个4个分区的RDD,并将每个分区的数据放到一个数组
1)创建
[Scala] 纯文本查看 复制代码
scala> val rdd = sc.parallelize(1 to 16,4)

[Scala] 纯文本查看 复制代码
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24

2)将每个分区的数据放到一个数组并收集到Driver端打印
[Scala] 纯文本查看 复制代码
scala> rdd.glom().collect()

[Scala] 纯文本查看 复制代码
res25: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16))

7 groupBy(func)案例
1. 作用:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。
2. 需求:创建一个RDD,按照元素模以2的值进行分组。
1)创建
[Scala] 纯文本查看 复制代码
scala> val rdd = sc.parallelize(1 to 4)

[Scala] 纯文本查看 复制代码
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24

2)按照元素模以2的值进行分组
[Scala] 纯文本查看 复制代码
scala> val group = rdd.groupBy(_%2)

[Scala] 纯文本查看 复制代码
group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:26

3)打印结果
[Scala] 纯文本查看 复制代码
scala> group.collect

[Scala] 纯文本查看 复制代码
res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4)), (1,CompactBuffer(1, 3)))

8 filter(func) 案例
1. 作用:过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。
2. 需求:创建一个RDD(由字符串组成),过滤出一个新RDD(包含”xiao”子串)
1)创建
[Scala] 纯文本查看 复制代码
scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))

[Scala] 纯文本查看 复制代码
sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24

2)打印
scala> sourceFilter.collect()[/mw_shl_code]
[Scala] 纯文本查看 复制代码
res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi)

3)过滤出含” xiao”子串的形成一个新的RDD
[Scala] 纯文本查看 复制代码
scala> val filter = sourceFilter.filter(_.contains("xiao"))

[Scala] 纯文本查看 复制代码
filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:26

4)打印新RDD
[Scala] 纯文本查看 复制代码
scala> filter.collect()

[Scala] 纯文本查看 复制代码
res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe)

9 sample(withReplacement, fraction, seed) 案例
1. 作用:以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。
2. 需求:创建一个RDD1-10),从中选择放回和不放回抽样
1)创建RDD
[Scala] 纯文本查看 复制代码
scala> val rdd = sc.parallelize(1 to 10)

[Scala] 纯文本查看 复制代码
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24

2)打印
[Scala] 纯文本查看 复制代码
scala> rdd.collect()

[Scala] 纯文本查看 复制代码
res15: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

3)放回抽样
[Scala] 纯文本查看 复制代码
scala> var sample1 = rdd.sample(true,0.4,2)

[Scala] 纯文本查看 复制代码
sample1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[21] at sample at <console>:26

4)打印放回抽样结果
[Scala] 纯文本查看 复制代码
scala> sample1.collect()

[Scala] 纯文本查看 复制代码
res16: Array[Int] = Array(1, 2, 2, 7, 7, 8, 9)

5)不放回抽样
[Scala] 纯文本查看 复制代码
scala> var sample2 = rdd.sample(false,0.2,3)

[Scala] 纯文本查看 复制代码
sample2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[22] at sample at <console>:26

6)打印不放回抽样结果
[Scala] 纯文本查看 复制代码
scala> sample2.collect()

[Scala] 纯文本查看 复制代码
res17: Array[Int] = Array(1, 9)

10 distinct([numTasks])) 案例
1. 作用:对源RDD进行去重后返回一个新的RDD。默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它。
2. 需求:创建一个RDD,使用distinct()对其去重。
1)创建一个RDD
[Scala] 纯文本查看 复制代码
scala> val distinctRdd = sc.parallelize(List(1,2,1,5,2,9,6,1))

[Scala] 纯文本查看 复制代码
distinctRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:24

2)对RDD进行去重(不指定并行度)
[Scala] 纯文本查看 复制代码
scala> val unionRDD = distinctRdd.distinct()

[Scala] 纯文本查看 复制代码
unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[37] at distinct at <console>:26

3)打印去重后生成的新RDD
[Scala] 纯文本查看 复制代码
scala> unionRDD.collect()

[Scala] 纯文本查看 复制代码
res20: Array[Int] = Array(1, 9, 5, 6, 2)

4)对RDD(指定并行度为2
[Scala] 纯文本查看 复制代码
scala> val unionRDD = distinctRdd.distinct(2)

[Scala] 纯文本查看 复制代码
unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[40] at distinct at <console>:26

5)打印去重后生成的新RDD
[Scala] 纯文本查看 复制代码
scala> unionRDD.collect()

[Scala] 纯文本查看 复制代码
res21: Array[Int] = Array(6, 2, 1, 9, 5)

11 coalesce(numPartitions) 案例
1. 作用:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
2. 需求:创建一个4个分区的RDD,对其缩减分区
1)创建一个RDD
[Scala] 纯文本查看 复制代码
scala> val rdd = sc.parallelize(1 to 16,4)

[Scala] 纯文本查看 复制代码
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at parallelize at <console>:24

2)查看RDD的分区数
[Scala] 纯文本查看 复制代码
scala> rdd.partitions.size

[Scala] 纯文本查看 复制代码
res20: Int = 4

3)对RDD重新分区
[Scala] 纯文本查看 复制代码
scala> val coalesceRDD = rdd.coalesce(3
)
[Scala] 纯文本查看 复制代码
coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[55] at coalesce at <console>:26

4)查看新RDD的分区数
[Scala] 纯文本查看 复制代码
scala> coalesceRDD.partitions.size

[Scala] 纯文本查看 复制代码
res21: Int = 3

2 repartition(numPartitions) 案例
1. 作用:根据分区数,重新通过网络随机洗牌所有数据。
2. 需求:创建一个4个分区的RDD,对其重新分区
1)创建一个RDD
[Scala] 纯文本查看 复制代码
scala> val rdd = sc.parallelize(1 to 16,4)

[Scala] 纯文本查看 复制代码
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at parallelize at <console>:24

2)查看RDD的分区数
[Scala] 纯文本查看 复制代码
scala> rdd.partitions.size

[Scala] 纯文本查看 复制代码
res22: Int = 4

3)对RDD重新分区
[Scala] 纯文本查看 复制代码
scala> val rerdd = rdd.repartition(2)

[Scala] 纯文本查看 复制代码
rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[60] at repartition at <console>:26

4)查看新RDD的分区数
[Scala] 纯文本查看 复制代码
scala> rerdd.partitions.size

[Scala] 纯文本查看 复制代码
res23: Int = 2

13 coalescerepartition的区别
1. coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
2. repartition实际上是调用的coalesce,默认是进行shuffle的。源码如下:
[Scala] 纯文本查看 复制代码
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {  coalesce(numPartitions, shuffle = true)}

14 sortBy(func,[ascending], [numTasks]) 案例
1. 作用;使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。
2. 需求:创建一个RDD,按照不同的规则进行排序
1)创建一个RDD
[Scala] 纯文本查看 复制代码
scala> val rdd = sc.parallelize(List(2,1,3,4))

[Scala] 纯文本查看 复制代码
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24

2)按照自身大小排序
[Scala] 纯文本查看 复制代码
scala> rdd.sortBy(x => x).collect()

[Scala] 纯文本查看 复制代码
res11: Array[Int] = Array(1, 2, 3, 4)

3)按照与3余数的大小排序
[Scala] 纯文本查看 复制代码
scala> rdd.sortBy(x => x%3).collect()

[Scala] 纯文本查看 复制代码
res12: Array[Int] = Array(3, 4, 1, 2)

15 pipe(command, [envVars]) 案例
1. 作用:管道,针对每个分区,都执行一个shell脚本,返回输出的RDD
注意:脚本需要放在Worker节点可以访问到的位置
2. 需求:编写一个脚本,使用管道将脚本作用于RDD上。
1)编写一个脚本
Shell脚本
[Shell] 纯文本查看 复制代码
#!/bin/shecho "AA"while read LINE; do   echo ">>>"${LINE}done

2)创建一个只有一个分区的RDD
[Scala] 纯文本查看 复制代码
scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),1)

[Scala] 纯文本查看 复制代码
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[50] at parallelize at <console>:24

3)将脚本作用该RDD并打印
[Scala] 纯文本查看 复制代码
scala> rdd.pipe("/opt/module/spark/pipe.sh").collect()

[Scala] 纯文本查看 复制代码
res18: Array[String] = Array(AA, >>>hi, >>>Hello, >>>how, >>>are, >>>you)

4)创建一个有两个分区的RDD
[Scala] 纯文本查看 复制代码
scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),2)

[Scala] 纯文本查看 复制代码
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[52] at parallelize at <console>:24

5)将脚本作用该RDD并打印
[Scala] 纯文本查看 复制代码
scala> rdd.pipe("/opt/module/spark/pipe.sh").collect()

[Scala] 纯文本查看 复制代码
res19: Array[String] = Array(AA, >>>hi, >>>Hello, AA, >>>how, >>>are, >>>you)


脚本执行结果

脚本执行结果

免费评分

参与人数 1吾爱币 +5 热心值 +1 收起 理由
苏紫方璇 + 5 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2024-11-26 03:23

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表