Spark 5:Spark Core 内核调度

chatgpt/2023/9/27 15:31:22

DAG
Spark的核心是根据RDD来实现的,Spark Scheduler则为Spark核心实现的重要一环,其作用就是任务调度。Spark的任务调度就是如何组织任务去处理RDD中每个分区的数据,根据RDD的依赖关系构建DAG,基于DAG划分Stage,将每个Stage中的任务发到指定节点运行。基于Spark的任务调度原理,可以合理规划资源利用,做到尽可能用最少的资源高效地完成任务计算。

 

f7522e1437864665a29ee5b880091a0f.png

43922b7dbf7e443d9bc027b8eb98bbdb.png

15779273999d4f66a9a20633f74057fe.png

5be8f9f537934951bb8ddaa4c243170d.png

8ee312eea62148df974f4223c628377e.png

a78adb3bf058417a99ed043987074ec8.png

68755757d7cc41dcab9f3efcd522d77a.png

DAG的宽窄依赖和阶段划分

2c79eedc54fb4eaa97ede39c43457c43.png

ba46cf039429404f84128b3eace7a521.png

e3c2e36941f4465fa804d15984a10f7e.png

3cc03cdbb5744299a3d61f81ff26d6ac.png

内存迭代计算

17731734dc2c4fbe84affbc2c902e14a.png

caace358e9b149fb8bca5655a6b00275.png

17cf980bd4fe417b9768cb6b691279a6.png

Spark 并行度

8c564b4080754fbf822a3b0feec1d360.png

e6fd1101c29a4786be19c7a9b6add2f0.png

d3e9484e08034b05a334567dde8c06e9.png

e04a272b65b446cbab955c4a6b499c3f.png

d4a5c2e4674041699fe3bc38e5cae53d.png

b78cf6d47dc24dd1be337fdb18b80dc7.png

c1d7340c955340119699f5413fc7e1fd.png

c4a9c8b512d84614983c9e260698d751.png

Spark运行中的概念名词大全

b7f1bd1529f84502bc105c8f2c306f6c.png

d2e0d281631c4000ac291aa7394562d1.png

Spark Shuffle

首先回顾MapReduce框架中Shuffle过程,整体流程图如下:

aa181b1ab2994ebaa7bba6cb06c2964e.png

Spark在DAG调度阶段会将一个Job划分为多个Stage,上游Stage做map工作,下游Stage做reduce工作,其本质上还是MapReduce计算框架。Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce输入中,涉及到序列化反序列化、跨节点网络IO以及磁盘读写IO等。

3c0878de8d4d4230a45beb5242429f25.png

Spark的Shuffle分为Write和Read两个阶段,分属于两个不同的Stage,前者是Parent Stage的最后一步,后者是Child Stage的第一步。

执行Shuffle的主体是Stage中的并发任务,这些任务分ShuffleMapTask和ResultTask两种,ShuffleMapTask要进行Shuffle,ResultTask负责返回计算结果,一个Job中只有最后的Stage采用ResultTask,其他的均为ShuffleMapTask。如果要按照map端和reduce端来分析的话,ShuffleMapTask可以即是map端任务,又是reduce端任务,因为Spark中的Shuffle是可以串行的;ResultTask则只能充当reduce端任务的角色。

d6e463cbc7be468aa5d1b17d9ac07d79.png

Spark在1.1以前的版本一直是采用Hash Shuffle的实现的方式,到1.1版本时参考Hadoop MapReduce的实现开始引入Sort Shuffle,在1.5版本时开始Tungsten钨丝计划,引入UnSafe Shuffle优化内存及CPU的使用,在1.6中将Tungsten统一到Sort Shuffle中,实现自我感知选择最佳Shuffle方式,到的2.0版本,Hash Shuffle已被删除,所有Shuffle方式全部统一到Sort Shuffle一个实现中。

在Spark的中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager随着Spark的发展有两种实现的方式,分别为HashShuffleManager和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort Shuffle两种。
在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManagerHashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。
因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

Hash Shuffle
Shuffle阶段划分:
shuffle write:mapper阶段,上一个stage得到最后的结果写出
shuffle read :reduce阶段,下一个stage拉取上一个stage进行合并

1)未经优化的hashShuffleManager:
HashShuffle是根据task的计算结果的key值的hashcode%ReduceTask来决定放入哪一个区分,这样保证相同的数据一定放入一个分区,Hash Shuffle过程如下:

d89dc7cdb26342ae8227f30882a02011.png

根据下游的task决定生成几个文件,先生成缓冲区文件在写入磁盘文件,再将block文件进行合并。
未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。提出如下解决方案
2)经过优化的hashShuffleManager:
在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,每一个Group磁盘文件的数量与下游stage的task数量是相同的。

a7dee6741c35492d85185db81994d402.png

未经优化:
上游的task数量:m
下游的task数量:n
上游的executor数量:k (m>=k)
总共的磁盘文件:m*n
优化之后的:
上游的task数量:m
下游的task数量:n
上游的executor数量:k (m>=k)
总共的磁盘文件:k*n

Sort Shuffle Manager
SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle write task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

ea5e1980f28641b5be8975e868736c97.png

(1)该模式下,数据会先写入一个内存数据结构中(默认5M),此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。
(2)接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
(3)排序:在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。
(4)溢写:排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。
(5)merge:一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并成1个磁盘文件,这就是merge过程。
由于一个task就只对应一个磁盘文件,也就意味着该task为Reduce端的stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

Sort Shuffle bypass机制
bypass运行机制的触发条件如下:
1)shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold=200参数的值。
2)不是map combine聚合的shuffle算子(比如reduceByKey有map combie)。

28024cc33f1440ceb4e817cee1f55a6e.png

此时task会为每个reduce端的task都创建一个临时磁盘文件,并将数据按key进行hash,然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。
而该机制与普通SortShuffleManager运行机制的不同在于:
第一,磁盘写机制不同;
第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

总结:
SortShuffle也分为普通机制和bypass机制。
普通机制在内存数据结构(默认为5M)完成排序,会产生2M个磁盘小文件。
而当shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。或者算子不是聚合类的shuffle算子(比如reduceByKey)的时候会触发SortShuffle的bypass机制,SortShuffle的bypass机制不会进行排序,极大的提高了其性能。

Shuffle的配置选项
Shuffle阶段划分:
shuffle write:mapper阶段,上一个stage得到最后的结果写出
shuffle read :reduce阶段,下一个stage拉取上一个stage进行合并

Shuffle的配置选项:

spark 的shuffle调优:主要是调整缓冲的大小,拉取次数重试重试次数与等待时间,内存比例分配,是否进行排序操作等等
spark.shuffle.file.buffer
参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小(默认是32K)。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写
到磁盘。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
spark.reducer.maxSizeInFlight:
参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。(默认48M)
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现
,合理调节该参数,性能会有1%~5%的提升。
spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait:
spark.shuffle.io.maxRetries :shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试
的最大次数。(默认是3次)
spark.shuffle.io.retryWait:该参数代表了每次重试拉取数据的等待间隔。(默认为5s)
调优建议:一般的调优都是将重试次数调高,不调整时间间隔。
spark.shuffle.memoryFraction:
参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作内存比例。
spark.shuffle.manager
参数说明:该参数用于设置shufflemanager的类型(默认为sort).Spark1.5x以后有三个可选项:
Hash:spark1.x版本的默认值,HashShuffleManager
Sort:spark2.x版本的默认值,普通机制,当shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数,自动开启bypass 机制
spark.shuffle.sort.bypassMergeThreshold
参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作。
调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些

DAG是什么有什么用? 
DAG有向无环图, 用以描述任务执行流程,主要作用是协助DAG调度器构建Task分配用以做任务管理
内存迭代\阶段划分?
基于DAG的宽窄依赖划分阶段,阶段内部都是窄依赖可以构建内存迭代的管道
DAG调度器是?
构建Task分配用以做任务管理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.exyb.cn/news/show-5312864.html

如若内容造成侵权/违法违规/事实不符,请联系郑州代理记账网进行投诉反馈,一经查实,立即删除!

相关文章

一篇关于预测“未来”的教程:运行在 Intel AIxBoard™ 开发板上的 TDengine

英特尔数字化开发套件 AIxBoard 是一款 AI 架构的人工智能嵌入式开发板,体积小巧功能强大,可以在时序数据预测、图像分类、目标检测分割和语音处理等应用中并行运行多个神经网络。作为一款面向专业创客、开发者的功能强大的小型计算机,借助开…

接口自动化测试平台

下载了大神的EasyTest项目demo修改了下<https://testerhome.com/topics/12648 原地址>。也有看另一位大神的HttpRunnerManager<https://github.com/HttpRunner/HttpRunnerManager 原地址>&#xff0c;由于水平有限&#xff0c;感觉有点复杂~~~ 【整整200集】超超超…

油封属于唇形密封件吗?

油封是否与唇形密封相同的问题是工程和机械应用领域的一个常见问题。要回答这个问题&#xff0c;了解两种密封件的功能和特性非常重要。 首先我们来定义一下什么是油封。油封&#xff0c;也称为轴封&#xff0c;是一种用于防止流体(通常是油)从旋转或移动部件泄漏的装置。它的…

【Rust学习 | 基础系列3 | Hello, Rust】编写并运行第一个Rust程序

文章目录 前言一&#xff0c;创建项目二&#xff0c;两种编译方式1. 使用rustc编译器编译2. 使用Cargo编译 总结 前言 在开始学习任何一门新的编程语言时&#xff0c;都会从编写一个简单的 “Hello, World!” 程序开始。在这一章节中&#xff0c;将会介绍如何在Rust中编写并运…

Qt信号与槽机制的基石-MOC详解

引入 上篇讲到了信号与槽就是实现的观察者模式&#xff0c;那具体如何生成映射表就是moc做的事情。 一、moc简介 1. moc的定义 moc 全称是 Meta-Object Compiler&#xff0c;也就是“元对象编译器”&#xff0c;它主要用于处理C源文件中的非标准C代码。Qt 程序在交由标准编…

Django用户登录验证和自定义验证类

一、FBV 用户登录验证 1.1 登录验证并加入 session 用户登录时&#xff0c;使用 authenticate 验证用户名和密码是否正确&#xff0c;正确则返回一个用户对象。 用户名默认的字段名是 username 密码默认的字段名是 password 将已验证的用户添加到当前会话(session)中&#x…

vue项目环境 搭建

1、安装nodejs 2、安装vue-cli, npm i -g vue/cli-init 3、初始化项目 vue init webpack test 4、运行 cd test npm run dev

Go Ethereum源码学习笔记000

Go Ethereum源码学习笔记 前言时代的弄潮儿: Blockchain为什么要研究以太坊& Go-Ethereum 的原理 前言 这个专栏的内容是免费的&#xff0c;因为自己这边都是基于开源库和开源内容整理的学习笔记&#xff0c;在这个过程中进行增删改查&#xff0c;将自己的理解融入其中&am…
推荐文章