Spark Streaming

13 Apr 2021

官网: http://spark.apache.org/streaming/

Spark Streaming概述

什么是Spark Streaming

Spark Streaming类似于Apache Storm(来一条数据处理一条,延迟低,响应快,低吞吐量),用于流式数据的处理;
Spark Streaming具有有高吞吐量和容错能力强等特点;
Spark Streaming支持的数据输入源很多,例如:Kafka(最重要的数据源)、 Flume、Twitter 和 TCP socket等;
数据输入后可用高度抽象API,如:map、reduce、join、window等进行运算;
处理结果能保存在很多地方,如HDFS、数据库等;
Spark Streaming 能与 MLlib 以及 Graphx 融合。
Spark Streaming 与 Spark 基于 RDD 的概念比较类似;
Spark Streaming使用离散化流(Discretized Stream)作为抽象表示,称为DStream。
DStream是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,DStream 是由这些 RDD 所组成的序列。
DStream 可以从各种输入源创建,比如 Flume、Kafka 或者 HDFS。创建出来的 DStream 支持两种操作:

Spark Streaming架构

Spark Streaming使用 mini-batch 的架构,把流式计算当作一系列连续的小规模批处理来对待。(伪流式处理)
Spark Streaming从各种输入源中读取数据,并把数据分组为小的批次。新的批次按均匀的时间间隔创建出来。

Spark Streaming 优缺点

与传统流式框架相比,Spark Streaming 最大的不同点在于它对待数据是粗粒度的处 理方式,即一次处理一小批数据,而其他框架往往采用细粒度的处理模式,即依次处 理一条数据。Spark Streaming 这样的设计实现既为其带来了显而易见的优点,又引 入了不可避免的缺点。

DStream基础数据源

基础数据源包括:文件数据流、socket数据流、RDD队列流;这些数据源主要用于测试。

引入依赖:

<dependency>  
    <groupId>org.apache.spark</groupId>  
    <artifactId>spark-streaming_2.12</artifactId>  
    <version>${spark.version}</version>  
</dependency>  

文件数据流

文件数据流:通过 textFileStream(directory) 方法进行读取 HDFS 兼容的文件系统文件

Spark Streaming 将会监控 directory 目录,并不断处理移动进来的文件

import org.apache.log4j.{Level, Logger}  
import org.apache.spark.SparkConf  
import org.apache.spark.streaming.dstream.DStream  
import org.apache.spark.streaming.{Seconds, StreamingContext}  
  
object FileDStream {  
  def main(args: Array[String]): Unit = {  
    Logger.getLogger("org").setLevel(Level.WARN)  
  
    val conf: SparkConf = new SparkConf()  
      .setAppName(this.getClass.getCanonicalName.init)  
      .setMaster("local[*]")  
  
  
    val ssc = new StreamingContext(conf, Seconds(10))  
  
    // 1. create DStream  
    // 只会读取新创建,或移动进来的文件(验证时间戳)  
    val lines: DStream[String] = ssc.textFileStream("data/log/")  
  
    // 2. DStream transform  
    val result: DStream[(String, Int)] = lines  
      .flatMap(_.split("\\s+"))  
      .map((_, 1))  
      .reduceByKey(_ + _)  
  
    // 3. DStream output  
    result.print(20)  
  
    // 4. start job  
    ssc.start()  
    // 挂起,不调用会直接结束进程  
    ssc.awaitTermination()  
  }  
}  

Socket数据流

Spark Streaming可以通过Socket端口监听并接收数据,然后进行相应处理;

新开一个命令窗口,启动 nc 程序:

nc -lk 9999  
# yum install nc  

随后可以在nc窗口中随意输入一些单词,监听窗口会自动获得单词数据流信息,在 监听窗口每隔x秒就会打印出词频统计信息,可以在屏幕上出现结果。

备注:使用local[],可能存在问题。 如果给虚拟机配置的cpu数为1,使用local[]也只会启动一个线程,该线程用于receiver task,此时没有资源处理接收达到的数据。 【现象:程序正常执行,不会打印时间戳,屏幕上也不会有其他有效信息】

注意:DStream的 StorageLevel 是 MEMORY_AND_DISK_SER_2;

  
// 修改前面fileTextStream的数据源,其他测试代码相同  
  
val lines: DStream[String] = ssc.socketTextStream(  
  "localhost",  
  9999,  
  StorageLevel.MEMORY_AND_DISK_SER_2)  
  
  

RDD队列流

调试Spark Streaming应用程序的时候,可使用streamingContext.queueStream(queueOfRDD) 创建基于RDD队列的DStream;

备注:

每秒创建一个RDD(RDD存放1-100的整数),Streaming每隔1秒就对数据进行处 理,计算RDD中数据除10取余的个数。

import org.apache.log4j.{Level, Logger}  
import org.apache.spark.SparkConf  
import org.apache.spark.rdd.RDD  
import org.apache.spark.streaming.{Seconds, StreamingContext}  
import org.apache.spark.streaming.dstream.{DStream, InputDStream}  
  
import scala.collection.mutable  
  
object RDDDStream {  
  def main(args: Array[String]): Unit = {  
    Logger.getLogger("org").setLevel(Level.ERROR)  
  
    val conf: SparkConf = new SparkConf()  
      .setAppName(this.getClass.getCanonicalName.init)  
      .setMaster("local[*]")  
  
  
    val ssc = new StreamingContext(conf, Seconds(1))  
  
    // 1. create DStream  
    val queue = new mutable.Queue[RDD[Int]]()  
  
    val queueDStream: InputDStream[Int] = ssc.queueStream(queue)  
  
    // 2. DStream transform  
    val result: DStream[(Int, Int)] = queueDStream.map(elem => (elem % 10, 1))  
      .reduceByKey(_ + _)  
  
    // 3. DStream output  
    result.print()  
  
    // 4. start job  
    ssc.start()  
  
    for (i <- 1 to 5) {  
      // 涉及到同时出队和入队操作,所以要做同步  
      queue.synchronized{  
        val range = (1 to 100).map(_*i)  
        queue += ssc.sparkContext.makeRDD(range, 2)  
      }  
      Thread.sleep(1000)  
    }  
  
    ssc.stop()  
  
  }  
}  
  

DStream转换操作

DStream上的操作与RDD的类似,分为 Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的方法,如: updateStateByKey、transform 以及各种 Window 相关的操作。

备注:

DStream 的转化操作可以分为 无状态(stateless) 和 有状态(stateful) 两种:

无状态转换

无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化
DStream 中的每一个 RDD。 常见的无状态转换包括:map、flatMap、filter、repartition、reduceByKey、
groupByKey;直接作用在DStream上

重要的转换操作:transform。通过对源DStream的每个RDD应用RDD-to-RDD函 数,创建一个新的DStream。支持在新的DStream中做任何RDD操作

这是一个功能强大的函数,它可以允许开发者直接操作其内部的RDD。也就是说开 发者,可以提供任意一个RDD到RDD的函数,这个函数在数据流每个批次中都被调 用,生成一个新的流。

过滤黑名单案例

import org.apache.log4j.{Level, Logger}  
import org.apache.spark.SparkConf  
import org.apache.spark.rdd.RDD  
import org.apache.spark.sql.{DataFrame, SparkSession}  
import org.apache.spark.streaming.dstream.ConstantInputDStream  
import org.apache.spark.streaming.{Seconds, StreamingContext}  
  
object BlackListFilter1 {  
  def main(args: Array[String]): Unit = {  
    Logger.getLogger("org").setLevel(Level.ERROR)  
  
    val conf: SparkConf = new SparkConf()  
      .setAppName(this.getClass.getCanonicalName.init)  
      .setMaster("local[*]")  
  
    val ssc = new StreamingContext(conf, Seconds(1))  
  
    /**  
     * 黑名单过滤,在blackList中,为true的key会被过滤掉  
     */  
    val blackList = Array(("spark", true), ("scala", false))  
    val blackListRDD: RDD[(String, Boolean)] = ssc.sparkContext.makeRDD(blackList)  
  
    val strArray: Array[String] = "Hello World Hello scala Hadoop Hello spark kafka hive zookeeper hbase flume sqoop"  
      .split("\\s+")  
      .zipWithIndex  
      .map{case (k, v) => s"$v $k"}  
    val rdd: RDD[String] = ssc.sparkContext.makeRDD(strArray)  
  
  
    // ConstantInputDStream  
    // An input stream that always returns the same RDD on each time step. Useful for testing.  
    val wordDStream: ConstantInputDStream[String] = new ConstantInputDStream(ssc, rdd)  
  
    // 1. 方法一 使用RDD Join  
    // transform Return a new DStream in which each RDD is  
    // generated by applying a function * on each RDD of 'this' DStream.  
    wordDStream.transform{rdd =>  
      val result = rdd.map{ line =>  
        (line.split("\\s+")(1), line)  
      }.leftOuterJoin(blackListRDD)  
          .filter{case (_, (_, right)) => !right.getOrElse(false)}  
          .map{case (_, (left, _)) => left}  
      result  
    }.print(20)  
  
  
    // 2. 方法二 使用sql join  
    wordDStream.map(line => (line.split("\\s+")(1), line))  
      .transform{rdd =>  
        val spark = SparkSession  
          .builder()  
          .config(rdd.sparkContext.getConf)  
          .getOrCreate()  
        import spark.implicits._  
  
        val wordDF: DataFrame = rdd.toDF("word", "line")  
        val blackListDF = blackListRDD.toDF("word", "flag")  
        wordDF  
          .join(blackListDF, Seq("word"), "left_outer")  
          .filter("flag is null or flag = false")  
          .select("line")  
          .rdd  
      }.print(20)  
  
    // 3. 方法三 直接使用filter  
    wordDStream.map(line => (line.split("\\s+")(1).toLowerCase, line))  
      .filter{case (word, _) => !blackList.filter(_._2).map(_._1).contains(word)}  
      .map(_._2)  
      .print(20)  
  
  
    ssc.start()  
    ssc.awaitTermination()  
  }  
}  

有状态转换

有状态的转换主要有两种:窗口操作、状态跟踪操作

DStream输出操作

输出操作定义 DStream 的输出操作。
与 RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没有被执 行输出操作,那么这些 DStream 就都不会被求值
如果 StreamingContext 中没有设定输出操作,整个流式作业不会启动。

通用的输出操作 foreachRDD,用来对 DStream 中的 RDD 进行任意计算。在 foreachRDD中,可以重用 Spark RDD 中所有的 Action 操作。需要注意的点:

与Kafka整合

官网:http://spark.apache.org/docs/2.4.5/streaming-kafka-integration.html

针对不同的spark、kafka版本,集成处理数据的方式分为两种:
Receiver ApproachDirect Approach,不同集成版本处理方式的支持,可参考上图

Kafka-08 接口(了解)

Kafka-010 接口

Spark Streaming与kafka 0.10的整合,和0.8版本的 Direct 方式很像。Kafka的分区 和Spark的RDD分区是一一对应的,可以获取 offsets 和元数据,API 使用起来没有 显著的区别。

添加依赖:

<dependency>  
    <groupId>org.apache.spark</groupId>  
    <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>  
    <version>${spark.version}</version>  
</dependency>  

不要手动添加 org.apache.kafka 相关的依赖,如kafka-clients。spark-streaming- kafka-0-10已经包含相关的依赖了,不同的版本会有不同程度的不兼容。

Offset 管理(010)

Spark Streaming集成Kafka,允许从Kafka中读取一个或者多个 topic 的数据。一个 Kafka Topic包含一个或多个分区,每个分区中的消息顺序存储,并使用 offset 来标 记消息的位置。开发者可以在 Spark Streaming 应用中通过 offset 来控制数据的读 取位置。

Offsets 管理对于保证流式应用在整个生命周期中数据的连贯性是非常重要的。如果 在应用停止或报错退出之前没有将 offset 持久化保存,该信息就会丢失,那么Spark Streaming就没有办法从上次停止或报错的位置继续消费Kafka中的消息。