Spark 原理

18 Apr 2021

Spark 原理主要包括 :

Spark Runtime

1.Using spark-submit, the user submits an application.
2.In spark-submit, we invoke the main() method that the user specifies. It also launches the driver program.
3.The driver program asks for the resources to the cluster manager that we need to launch executors.
4.The cluster manager launches executors on behalf of the driver program.
5.The driver process runs with the help of user application. Based on the actions and transformation on RDDs, the driver sends work to executors in the form of tasks.
6.The executors process the task and the result sends back to the driver through the cluster manager.

核心组件

集群部署模式

Spark 支持 3 种集群管理器,分别为:

Yarn模式运行机制

Master & Worker 解析

Spark RPC 框架

RPC(Remote Procedure Call)远程过程调用。两台服务器A、B,A服务器上的应用,想要调用B服务器上应用提供 的函数/方法,由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。

如果把分布式系统(Hadoop、Spark等)比作一个人,那么RPC可以认为是人体的血液循环系统。它将系统中各个 不同的组件联系了起来。在Spark中,不同组件之间的通信、jar的上传、Shuffle数据的传输、Block数据的复制与备 份都是基于RPC来实现的,所以说 RPC 是分布式系统的基石毫不为过。

Spark 2.X 的 RPC 框架是基于优秀的网络通信框架 Netty 开发的。Spark RPC借鉴了Akka(2.X以前
)的中的设计,它是基于 Actor模型,各个组件可以认为是一个个独立的实体,各个实体之间通过消息来进行通信。具体各个组件之间的关系 如上图

Master 启动流程

Master继承了RpcEndpoint,实现了 RpcEndpoint 接口
Master的生命周期遵循 constructor -> onStart -> receive* -> onStop 的步骤
Master 的 onStart 方法中最重要的事情是:执行恢复

Master HA的实现方式:

Worker 启动流程

Worker继承了RpcEndpoint,实现了 RpcEndpoint 接口
Worker的生命周期遵循 constructor -> onStart -> receive* -> onStop 的步骤
Worker 的 onStart 方法中最重要的事情是: 向master注册

SparkContext

Spark应用程序的第一步就是创建并初始化SparkContext,SparkContext的初始化过程包含了内部组件的创建和准 备,主要涉及网络通信、分布式、消息、存储、计算、调度、缓存、度量、清理、文件服务和UI等方面。
SparkContext 是 Spark 程序主要功能的入口点,链接Spark集群,创建RDD、累加器和广播变量,一个线程只能运 行一个SparkContext。
SparkContext在应用程序中将外部数据转换成RDD,建立了第一个RDD,也就是说SparkContext建立了RDD血缘关 系的根,是DAG的根源。

SparkContext 内部

重要组件

SparkEnv重要组件

SparkEnv是spark计算层的基石,不管是 Driver 还是 Executor,都需要依赖SparkEnv来进行计算,它是Spark的执 行环境对象,其中包括与众多Executor执行相关的对象。Spark 对任务的计算都依托于 Executor 的能力,所有的 Executor 都有自己的 Spark 的执行环境 SparkEnv。

有了 SparkEnv,可以将数据存储在存储体系中;利用计算引擎对计算任务进行处理,可以在节点间进行通信等。

SparkContext启动流程

初始化步骤:

  1. 初始设置
  2. 创建 SparkEnv
  3. 创建 SparkUI
  4. Hadoop 相关配置
  5. Executor 环境变量
  6. 注册 HeartbeatReceiver 心跳接收器
  7. 创建 TaskScheduler、SchedulerBackend
  8. 创建和启动 DAGScheduler
  9. 启动TaskScheduler、SchedulerBackend
  10. 启动测量系统 MetricsSystem
  11. 创建事件日志监听器
  12. 创建和启动 ExecutorAllocationManager
  13. ContextCleaner 的创建与启动
  14. 自定义 SparkListener 与启动事件
  15. Spark 环境更新
  16. 投递应用程序启动事件
  17. 测量系统添加Source
  18. 将 SparkContext 标记为激活

三大组件启动流程

作业执行原理

任务调度概述

Spark 的任务调度可分为:Stage 级调度(高层调度)、Task级调度(底层调度)。总体调度流程如上图所示:

job触发

Action 操作后会触发 Job 的计算,并交给 DAGScheduler 来提交。

1、Action 触发 sc.runJob
2、触发 dagScheduler.runJob
3、dagScheduler.runJob 提交job,作业提交后发生阻塞,等待执行结果, job 是串行执行的。

Stage划分

Spark的任务调度从 DAG 划分开始,由 DAGScheduler 完成:

总体而言,DAGScheduler做的事情较为简单,仅仅是在Stage层面上划分DAG,提交Stage并监控相关状态信息。

Task调度

Task 的调度是由 TaskScheduler 来完成(底层调度)。
DAGScheduler 将 Stage 打包到 TaskSet 交给TaskScheduler,TaskScheduler 会将 TaskSet 封装为 TaskSetManager 加入到调度队列中

TaskScheduler 初始化后会启动 SchedulerBackend

SchedulerBackend负责跟外界打交道,接收 Executor 的注册,维护 Executor 的状态。 SchedulerBackend 是个管理“资源”(Executor)的,它在启动后会定期地去“询问” TaskScheduler 有没有任务要运行。
大致方法调用流程如上图所示

将 TaskSetManager 加入 rootPool 调度池中之后,调用 SchedulerBackend 的 reviveOffers 方法给driverEndpoint 发送 ReviveOffer 消息;driverEndpoint 收到 ReviveOffer 消息后调用 makeOffers 方法,过滤出活跃状态的 Executor(这些 Executor都是任务启动时反向注册到 Driver 的 Executor),然后将 Executor 封装成 WorkerOffer 对 象 ; 准备好计算资源(WorkerOffer)后 , taskScheduler 基于这些资源调用resourceOffer 在 Executor 上分 配 task。

调度策略

TaskScheduler会先把 DAGScheduler 给过来的 TaskSet 封装成 TaskSetManager 扔到任务队列里,然后再从任务队 列里按照一定规则把它们取出来,由 SchedulerBackend 发送给Executor运行;
TaskScheduler 以树的方式来管理任务队列,树中的根节点类型为 Schedulable,叶子节点为 TaskSetManager,非叶子节点为Pool;
TaskScheduler 支持两种调度策略:FIFO(默认调度策略)、FAIR。

本地化调度

返回结果

对于Executor的计算结果,会根据结果的大小使用不同的处理策略:

失败重试与黑名单机制

Task被提交到Executor启动执行后,Executor会将执行状态上报给SchedulerBackend(DriverEndpoint);
SchedulerBackend 则告诉 TaskScheduler,TaskScheduler 找到该 Task 对应的 TaskSetManager,并通知到该 TaskSetManager,这样 TaskSetManager 就知道 Task 的失败与成功状态;

即 SchedulerBackend(DriverEndPoint) => TaskScheduler => TaskSetManager

对于失败的Task,会记录它失败的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的Task池子 中,否则整个Application失败;

在记录 Task 失败次数过程中,会记录它上一次失败所在的Executor Id和Host。下次再调度这个Task时,会使用黑名 单机制,避免它被调度到上一次失败的节点上,起到一定的容错作用;

Shuffle详解

在 Spark 或 MapReduce 分布式计算框架中,数据被分成一块一块的分区,分布在集群中各节点上,每个计算任务 一次处理一个分区,当需要对具有某种共同特征的一类数据进行计算时,就需要将集群中的这类数据汇聚到同一节 点。这个按照一定的规则对数据重新分区的过程就是Shuffle。

Spark Shuffle的两个阶段

对于Spark来讲,一些Transformation或Action算子会让RDD产生宽依赖,即Parent RDD中的每个Partition被child RDD中的多个Partition使用,这时需要进行Shuffle,根据Record的key对parent RDD进行重新分区。 以Shuffle为边界,Spark将一个Job划分为不同的Stage。Spark的Shuffle分为Write和Read两个阶段,分属于两个不同的Stage,前者是Parent Stage的最后一步,后者是Child Stage的第一步。

Spark 的 Stage 分为两种:

如果按照 map 端和 reduce 端来分析的话:

Spark Shuffle的流程简单抽象为以下几步:

Shuffle Writer

ShuffleWriter(抽象类),有3个具体的实现:

以上 ShuffleWriter 有各自的应用场景。分别如下:

Shuffle MapOutputTracker

Spark的shuffle过程分为Writer和Reader:

MapOutputTracker在executor和driver端都存在:

Shuffle Reader

Hadoop Shuffle 与 Spark Shuffle 的区别

共同点:
二者从功能上看是相似的;从High Level来看,没有本质区别,实现(细节)上有区别

实现上的区别:

Shuffle优化

Spark作业的性能主要就是消耗了shuffle过程,因为该环节包含了众多低效的IO操作:磁盘IO、序列化、网络数据传输等;如果要让作业的性能更上一层楼,就有必要对 shuffle 过程进行调优。 但必须注意的是,影响Spark作业性能的因素,主要还是代码质量、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已。

开发过程中对 Shuffle 的优化:

Shuffle 的参数优化:

内存管理

在执行 Spark 的应用程序时,Spark 集群会启动 Driver 和 Executor 两种 JVM 进程:

Driver 的内存管理(缺省值 1G)相对来说较为简单,这里主要针对 Executor 的内存管理进行分析,下文中提到的 Spark 内存均特指 Executor 的内存。

堆内内存与堆外内存

作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对JVM 的堆内(On-heap)空间进行 了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统 内存中开辟空间,进一步优化了内存的使用。

堆内内存受到 JVM 统一管理,堆外内存是直接向操作系统进行内存的申请和释放。

静态内存管理

Spark 2.0 以前版本采用静态内存管理机制。存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置,堆内内存的分配如上图所示。
可用的存储内存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safetyFraction
可用的执行内存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction

systemMaxMemory 为当前 JVM 堆内内存的大小

这个预留的保险区域仅仅是一种逻辑上的规划,在具体使用时 Spark 并没有区别对待,和”其它内存”一样交给了 JVM 去管理。

静态内存管理机制实现起来较为简单,但如果用户不熟悉 Spark 的存储机制,或没有根据具体的数据规模和计算任 务或做相应的配置,很容易造成”一半海水,一半火焰”的局面,即存储内存和执行内存中的一方剩余大量的空间,而 另一方却早早被占满,不得不淘汰或移出旧的内容以存储新的内容。由于新的内存管理机制的出现,这种方式目前已 经很少有开发者使用,出于兼容旧版本的应用程序的目的,Spark 仍然保留了它的实现。

统一内存管理

Spark 2.0 之后引入统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域,统一内存管理的堆内内存结构如上图所示

存储内存管理

RDD缓存的数据 & 共享变量

执行内存管理

执行内存主要用来存储任务在执行 Shuffle 时占用的内存,Shuffle 是按照一定规则对 RDD 数据重新分区的过程, Shuffle 的 Write 和 Read 两阶段对执行内存的使用:
Shuffle Write
* 在 map 端会采用 ExternalSorter 进行外排,在内存中存储数据时主要占用堆内执行空间。
Shuffle Read
* 在对 reduce 端的数据进行聚合时,要将数据交给 Aggregator 处理,在内存中存储数据时占用堆内执行空间
* 如果需要进行最终结果排序,则要将再次将数据交给 ExternalSorter 处理,占用堆内执行空间

在 ExternalSorter 和 Aggregator 中,Spark 会使用一种叫 AppendOnlyMap 的哈希表在堆内执行内存中存储数 据,但在 Shuffle 过程中所有数据并不能都保存到该哈希表中,当这个哈希表占用的内存会进行周期性地采样估算, 当其大到一定程度,无法再从 MemoryManager 申请到新的执行内存时,Spark 就会将其全部内容存储到磁盘文件 中,这个过程被称为溢存(Spill),溢存到磁盘的文件最后会被归并。

Spark 的存储内存和执行内存有着截然不同的管理方式:

BlockManager

BlockManager是一个嵌入在 Spark 中的 key-value型分布式存储系统,也是 Master-Slave 结构的,RDD-cache、shuffle-output、broadcast 等的实现都是基于BlockManager来实现的:

BlockManager也是分布式结构,在Driver和所有Executor上都会有BlockManager。每个节点上存储的block信息都 会汇报给Driver端的BlockManager Master作统一管理,BlockManager对外提供get和set数据接口,可将数据存储 在Memory、Disk、Off-heap。

BlockManager Master

Driver的组件为BlockManager Master,负责:

BlockManager

每个节点都有一个 BlockManager,每个 BlockManager 创建之后,第一件事就是去向 BlockManager Master 进行 注册,此时 BlockManager Master 会为其创建对应的 BlockManagerInfo。

BlockManager运行在所有的节点上,包括所有 Driver 和 Executor 上:

数据倾斜

基本概念

什么是数据倾斜? Task之间数据分配的非常不均匀

数据倾斜有哪些现象

数据倾斜造成的危害有哪些

为什么会发生数据倾斜

如何定位发生数据倾斜
凭借经验或Web UI,找到对应的Stage;再找到对应的 Shuffle 算子

数据倾斜处理

做好数据预处理:

数据倾斜产生的主要原因:Shuffle + key分布不均

处理数据倾斜的基本思路:

Spark优化

编码的优化

参数优化