sparkstreaming小结

Dstream是spark streaming的一个基本抽象;如同rdd之于spark;只是一个类型的Dstream本质上是一系列连续的RDD,这些RDD只是要计算的数据不同,它们来自不同微批,源源不断。

sparkstreaming 的简单wordcount

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package cn.cht.day9
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by zx on 2017/10/17.
*/
object KafkaWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
val zkQuorum = "192.168.153.201:2181"
val groupId = "g1"
val topic = Map[String, Int]("hello_topic" -> 1)//1 好像是指定的线程数
//创建DStream,需要KafkaDStream
val data: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topic)
//对数据进行处理
//Kafak的ReceiverInputDStream[(String, String)]里面装的是一个元组(key是写入的key,value是实际写入的内容)
//这个key的功能是到底要写到哪个分区里面,默认情况下没写就是空,空呢它使用的是轮询的方式将数据写到不同的分区里面;
//其实这个key的功能好比是以前往数据里面写可以根据这个key到底写到哪个分区,其实它内部也有一个类似于hash的一个算法,将我们的数据写到kafka的哪个分区里面
val lines: DStream[String] = data.map(_._2)
//对DSteam进行操作,你操作这个抽象(代理,描述),就像操作一个本地的集合一样
//切分压平
val words: DStream[String] = lines.flatMap(_.split(" "))
//单词和一组合在一起
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
//聚合
val reduced: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
//打印结果(Action)
reduced.print()
//启动sparksteaming程序
ssc.start()
//等待优雅的退出
ssc.awaitTermination()
}
}

但是上面的程序只能统计一个微批次的结果,如果想要获得指定过去的几个微批的结果或者说要获取到历史总计聚合结果就需要能够进行有状态的编程。sparkstreaming提供了更新状态累加的updateStateBykey方法。

于是,wordcount可以改造成如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package cn.cht.day9
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by zx on 2017/10/17.
*/
object StatefulKafkaWordCount {
/**
* 第一个参数:聚合的key,就是单词
* 第二个参数:当前批次产生批次该单词在每一个分区(当前这个批次里也有很多分区)出现的次数
* 第三个参数:初始值或累加的中间结果
* option有2个子类,有就是some 没有就是none;第一次没有初始值就是none,以后有初始值就是some
*/
val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
//iter.map(t => (t._1, t._2.sum + t._3.getOrElse(0)))
// t._2.sum + t._3.getOrElse(0))指的是当前批次结果与历史结果相加
//上面一行【iter.map(t => (t._1, t._2.sum + t._3.getOrElse(0)))】也可以用下面的模式匹配的方法
iter.map{ case(x, y, z) => (x, y.sum + z.getOrElse(0))}
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("StatefulKafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
//如果要使用可更新历史数据(累加,因为需要把中间结果checkpoint 到磁盘上),那么就要把中间结果保存起来
ssc.checkpoint("./ck")//这里是指定的是当前的目录,以后提交集群运行应当指定一个hdfs目录
val zkQuorum = "192.168.153.201:2181"
val groupId = "g100"
val topic = Map[String, Int]("hello_topic" -> 1)
//创建DStream,需要KafkaDStream
val data: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topic)
//对数据进行处理
//Kafak的ReceiverInputDStream[(String, String)]里面装的是一个元组(key是写入的key,value是实际写入的内容)
val lines: DStream[String] = data.map(_._2)
//对DSteam进行操作,你操作这个抽象(代理,描述),就像操作一个本地的集合一样
//切分压平
val words: DStream[String] = lines.flatMap(_.split(" "))
//单词和一组合在一起
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
//聚合
val reduced: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
//ssc.sparkContext.defaultParallelism 是默认的分区数量; true指定以后依然使用这个分区器
//这边传了一个自己写的函数updateFunc,然后怎么就实现了累加上历史数据的功能的;好像有一部分逻辑封装到源码的val cleanedFunc = ssc.sc.clean(updateFunc)这一步里了吧----20200325更新:应该不是的。
//打印结果(Action)
reduced.print()
//启动sparksteaming程序
ssc.start()
//等待优雅的退出
ssc.awaitTermination()
}
}

但是上面程序仍然存在问题,当程序重启之后,不能接着上次消费的地方继续累加历史状态,而是重新以本次运行时间开始去进行累加的计算。

sparkStreaming和kafka整合的2种方式:

  • Receiver方式

    采用高级的api去接收kafka中的数据,然后以自动更新偏移量的方式去提交偏移量。而且是达到固定时间才进行处理。这种方式容易丢数据,原因有:1:receiver可用内存满了,无法继续接收后续数据;2:一旦接收失败,receiver内存中已经接收的数据没有持久化;为了避免丢失,当达到某个阈值后把前面的数据写到磁盘里面,先读的数据写到磁盘里面(也就是WAL机制);

  • 直连方式

    直连方式采用更底层的api,将kafka的分区数据直接对接到spark的分区上。来一条处理一条;手动维护偏移量,我理解如果是自动提交偏移量的话有可能出现:先处理完了后提交导致重复消费,先提交还没处理完导致数据丢失。自动提交偏移量的时延不好把握;当然先处理逻辑后手动提交也会有重复消费的问题,这个时候考虑把这两步结合到一个事务里来解决。而直连方式的累积需要结合外部的数据库(redis)去完成。