0%

spark的任务构建和提交流程.md

[toc]

简述spark的任务运行流程#

先是在写spark代码的时候,背后做一些RDD的转换,写完后构建DAG,划分stage, 然后提交到资源管理器分配计算资源, 并在worker上执行。
在这里插入图片描述

首先写spark代码时离不开对RDD的调用,那么:

为什么需要RDD#

  1. 数据处理模型统一:
    RDD是1个数据结构, 能够获取数据的分区。
    不区分流式还是批式,只理解为1个数学模型。

  2. 依赖划分原则:
    RDD之间通过窄依赖(仅1个依赖)和宽依赖(多依赖)进行关联。
    为什么要划分依赖?
    依赖数量不同,决定是否能在1个stage和节点中执行。
    同时也决定了容灾策略,是否需要保存所有父RDD

  3. 数据处理效率:
    1个RDD,同时可在多个节点并发执行。

  4. 容错处理:
    RDD本身是不可变的数据集,这样可保证数据恢复在这里插入图片描述

wordCount代码的背后#

以wordCount代码为例

textFile#

第一步是读文件数据。

1
2
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
ctx.textFile(args[0],1)

这一步会生成HadoopRDD
在这里插入图片描述
这里注意下, 里面有一个清理序列化的操作, 分布式传输数据时,序列化很重要,而序列化时有些成员是无法被序列化的,在java中的关键字是transien.

MappedRDD是什么?
如下:
在这里插入图片描述
即我们的RDD,会被包装到一个单点依赖的对象里,并指明这是单点依赖。
并且textFile这个过程, 其实是生成了2个RDD, 1个是HadoopRDD,还有一个是读取数据转成字符串的mapRDD。 他们各自被塞进dependecy对象中,并通过依赖建立连接。

  • 注意,一般分布式计算设计DAG图时, 都是只有input指针(即用输入对象做连接), 利用input来确定DAG关系。 在spark里就是依赖dependency的概念

第二步做FlatMap#

接着我们调用flatMap

1
2
3
4
5
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>(){
@Override
public Iterable<String> call(String s){
return Arrays.asList(SPACE.split(s));
})

背后的RDD和依赖i情况变成如下:
在这里插入图片描述

第三步mapToPair#

懒得敲了,这里省略java代码。
在这里插入图片描述
看一下dag。
在这里插入图片描述

第四步聚合#

需要分组,然后进行合并相同的单词数量
在这里插入图片描述
此时dag如下:
在这里插入图片描述

  • 注意, 并不是shuffle这个RDD被包装进了shuffle依赖, 而是它的前置RDD被包进了shuffle依赖。
  • 即dependency确实是只包装依赖的, 你如果是属于某个shuffle过程的依赖,那么就会被包装成shuffleDepnecy。

最后一步collect#

当写完后,执行collet,进行计算执行和提交。
在这里插入图片描述

完整流程如下:
在这里插入图片描述

Collect的时候发生了什么#

collect后, 会通过dagScheduler进行runJob, submitJob的时候会返回一个waiter,在client端主程序中就会进行等待。
即client端提交任务时其实是异步的,会返回一个waiter进行等待,
在这里插入图片描述

看一下submitJob的时候发生了什么
在这里插入图片描述
前面看起来都是一些业务处理, 关键在handleJobSumitted的时候,会做一个newStage的操作,正好可以看一下spark里的stage是怎么确定和生成的。
在这里插入图片描述
父(依赖)stage列表是怎么获取的?
![在这里插入图片描述](https://img-blog.csdnimg.cn/20200701234813809.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2E3OTk1ODE
上图里的关键信息在于, 当遇到shuffle的时候,就会隔离出一个stage。

可以看一下之前提到的RDD拼成的DAG图,如下:
在这里插入图片描述

newStage后,有如下情况:
在这里插入图片描述
可以看到又给stage里有一个rdds的数组, 里面放了该stage的所有RDD, 并建立了依赖关系。
然后每个stage又通过parent去确定依赖关系。

stage提交#

newStage之后,会进行stage的提交
在这里插入图片描述

看一下submitStage的时候做了什么,注意此时是先从finalStage开始提交的。
在这里插入图片描述
这里可以看到, 虽然我们做了finalStage的提交, 但是会优先提交它所依赖的前置stage, 一直等待stage 完成了再真正提交自己这个stage。

这里看一下 stage是怎么发送的
在这里插入图片描述
上图可以看到stae座位task时,也是区分shuffle类型和map类型。
在这里插入图片描述

  • 这句话重点: 有多少个未计算的分区,决定了有多少个task, 即stage中的分区和task一一对应

完成的stage创建和提交流程图如下:
在这里插入图片描述