wordcount生成几个RDD

wordCount执行过程详解

wc过程有几个RDD?

wc代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//创建spark配置,设置应用程序名字
//val conf = new SparkConf().setAppName("ScalaWordCount")
val conf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[4]")
//创建spark执行的入口
val sc = new SparkContext(conf)
//指定以后从哪里读取数据创建RDD(弹性分布式数据集)
val lines: RDD[String] = sc.textFile("hdfs://node-4:9000/wc1", 1)//no1
lines.cache()
//lines.partitions.length
//切分压平
val words: RDD[String] = lines.flatMap(_.split(" "))//no2
//将单词和一组合
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))//no3
//按key进行聚合
val reduced:RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)//no4
reduced.saveAsTextFile(args(1))//no5
//释放资源
sc.stop()

no1 查看textFile源码
image

继续查看hadoopFile源码
image

可以看到会返回一个hadooprdd
同样可以看到line:1000 里的K是偏移量 V 是内容
但是ScalaWorldCount代码里的

1
val lines: RDD[String] = sc.textFile("hdfs://node-4:9000/wc1", 1)//no1

返回的RDD 只有String即内容 而没有偏移量;为什么呢?
原因就是下图还有一个map操作
image

所以说ScalaWordCount代码里的:

1
val lines: RDD[String] = sc.textFile("hdfs://node-4:9000/wc1", 1)//no1

就生成2个RDD(第一个是上面的hadoopfile 生成的rdd 另一个就是这里的map操作再生成一个RDD)

no2 和no3 再生成2个RDD

no4

查看reduceByKey源码,->reduceByKey->combineByKeyWithClassTag
来到PairRDDFunctions.scala 的combineByKeyWithClassTag方法

image

里面会生成一个shuffledRDD

no5
然后再看saveAsTextFile 他是一个action 但也会生成一个RDD(我感觉就是因为下面saveAsTextFile源码里面有个mapPartition)

查看跟踪saveAsTextFile源码

image

因此,总共6种RDD