0%

据说是个新手情感本,因为整个剧情基本就是线性发展,需要玩家发挥才能往下走的情节不存在的。

这是一个推理+情感本

前面大家都是家具的情况下, 破解3起“凶杀案”,过程挺欢乐。(我推理出了 第一幕里镜头翻转的可能,算是有点小成就,后面就没怎么推出来了)

1706975275182

但是一想到那玩之前详细的问卷,感觉不太对劲, 果然大家这些“念魂”都是有故事的,并且还原故事的过程中也需要推理。

我那个角色看起来是个劲爆的南桐,最后发现是个乌龙, 对方是个女同,但我却以为对方是男, 于是对 对方撒谎自己是女,对方也撒谎自己是男,两边都撒谎。 而我虽然很早就看到了“撒谎”这个点,却没有推理出来, 最后只能被扶着强行推出来,可惜了。(关于自己的角色部分我只推理出了我是个精神病, 其实应该更细致地读一读文字,看看哪些是没有写明的,其中最关键的就是性别)

当然本子里我那个角色的虐恋还是挺戳我的, 以及父亲爷爷之间的故事也很感人,配合着音乐真的有点绷不住想落泪,但还是忍住了。(也可能是我打的少)

然后最后全员奉献成就爱哭鬼也挺戳的,5保一yyds。

1706975242801

不过问题也是有, 就是后面的人物故事出来时, 是直接6个人按顺序分享一大顿文字,其中包含5-6条线,导致非常混乱,听得很累,如果能优化一下这个故事的叙述顺序就好了,分阶段或者分故事线叙述。 同时有些剧情还是比较雷的,特别是外婆那条线, 争议挺大,也难怪会特地避免让女生来做这个角色,避免火气太大

据说是个新手情感本,因为整个剧情基本就是线性发展,需要玩家发挥才能往下走的情节不存在的。

这个是一个硬核的推理本,存在传统的凶手。

其中我的表现状态不太好,感觉啥也没推出来,体验感觉也说不上好

一方面,可能是宅主人(叔叔)死亡的线索出得太早,导致大家都混乱了(实际上叔叔死亡是下一个章节的事情), 抛出的线索过多信息量过大,也缺少一个核心来梳理整个事情脉络。另一方面也是 我一直把他当作变格本,总是认为存在时间穿梭啥的,实际上就是利用了经典的诡计: 失忆、 精神错乱 、 镜像(双胞胎) 以及时间篡改(本中出现多个时间,则肉眼看到的时间一定存在被篡改的可能), 后面要多注意这些诡计的总结,以后才好更好地打硬核本。

但总体来说还行,至少能够自圆其说,算是一个完整的剧情

1706873269677

大多数人在学会游泳前都不想游泳,人生来就是要在陆地上生存的。

同样的,人也不想思考,毕竟我们是为了活着而生的,而不是为了思考而生的,总是想在思考的世界中走的很远,等同于从陆地换成了水,迟早要把自己淹死


整个一代人正好被裹挟在两个时代之间,裹挟在两种截然不同的生活方式之间,乃至失去了全部常识、伦理、安全感和率真之心。

[toc]

RDD缓存管理cacheManager#

当需要计算RDD时,需要避免重复计算的RDD。

  • 什么时候RDD可能会被重复计算?一般是宽依赖RDD, 即RDD的下游可能有多个, 但是另一个下游的拉去可能较慢, 那么此时需要做缓存。
    cacheManager只是对RDD的管理, 真正的缓存以及获取是通过blockManager,然后根据内存情况选择存内存还是存磁盘。
    在这里插入图片描述
  • RDD不是一定会做缓存,这取决于存储级别的设定。
  • RDD没缓存时, 不一定要重新计算, 也可能从CheckPoint中拿
  • checkPoint概念:

checkpoint在spark中主要有两块应用:一块是在spark core中对RDD做checkpoint,可以切断做checkpoint RDD的依赖关系,将RDD数据保存到可靠存储(如HDFS)以便数据恢复;另外一块是应用在spark streaming中,使用checkpoint用来保存DStreamGraph以及相关配置信息,以便在Driver崩溃重启的时候能够接着之前进度继续进行处理(如之前waiting batch的job会在重启后继续处理)。

如果需要存入内存,直接使用memoryStore即可,memoryStore的存储过程见上一篇博文。

如果要写入磁盘,需要调用diskStore提供的put方法把RDD对应的block块写入磁盘

diskStore和diskBlockManager有什么关系?#

diskStore里要写入数据时,负责打开某个文件, 然后往文件里写入。
取出数据时,也是找到对应的文件,然后取出数据。

而这个磁盘文件的管理并没有放到diskStore里实现,而是独立了一个diskBlockManager模块。
以DiskStore的putArray方法为例,从下图可看出关系:
在这里插入图片描述
即文件相关、序列化相关,都让diskBlockManager来搞了。

diskBlockManager的getFile过程#

上图里有个getFile的操作,即从DBM中拿到文件对象做写入。
那么这个文件创建时,怎么选路径,怎么命名?

首先,文件的路径和文件名, 使用2次哈希得到
在这里插入图片描述
使用2级哈希做路径的目的,是因为一级目录有多个,需要用哈希选择放到哪个一级目录。

每次创建文件的话,会把该文件放到DBM里的一个数组中,并加上钩子做管理,如果程序中止或者结束,需要主动清理临时文件。
在这里插入图片描述
DiskBlockManager全图:
在这里插入图片描述

[toc]

书中以经典的wordCount为例子
wordCount就是计算文本中a-z字母的个数,利用分布式计算的能力

mapreduce做wordCount#

在这里插入图片描述
标红的地方就是关键点, mapreducer做map后直接落盘了, 落盘后进行排序,排序完取出相同键的组发送,到resultTask做聚合计算。
其实不太懂mapreduce早期为什么这么做,是因为那时候还不知道怎么在内存和磁盘间切换吗?

早期spark的wordCount计算#

在这里插入图片描述
标红的地方是一个关键点。
map后优先根据key存到内存中, 并拆成1个个bucket, bucket就是之前提过的blockManager来管理。

要点#

  1. 做聚合计算(例如分组计数)前,需要先根据键值哈希,再进行shuffle。
  2. 先放内存进行分组哈希,如果实在放不下再放磁盘,需要引入BlockManager

[toc]

当spark中做完一次map操作,准备发给下游时,究竟会做什么事呢?我按照一些问题来逐步分析。

首先有个问题:map操作之后,数据是直接缓存到内存或者磁盘,等待下游client来拉取吗?

spark是批处理,假设正好map处理完一批数据,会调用insertAll方法去做缓存,然而缓存并不是那么简单的存储,而是如下:
在这里插入图片描述
可以看到,spark会判断这个map操作之后,是否会接一个聚合的操作,如果有,那么会在缓存并准备发给下游时时,提前做好聚合操作, 否则就是简单缓存。

我们先看下简单缓存的分支:
在这里插入图片描述
可以看到在这里地方会做容量判断,如果发现容量不足了,则会试图扩容,看下调用growArray会发生什么:
在这里插入图片描述
注意这里有个关键词是预测, spark为了防止出现omm,都是基于预测机制进行内存管理的。看下maybeSpillCollection具体在做什么
在这里插入图片描述
当发现内存大小可能不足,就会试图把内存数据放到磁盘。
然而放到磁盘也不是那么简单地去放,他判断了一个叫byPassMergeSort,这个是什么呢?

可以理解为是否需要提前为下游做好分区的磁盘排序。
每次存盘都会排序吗?
不是的,为了避免不必要的排序,他用bypassMergeThold这个阈值来确认,如果分区数量大于阈值,此时不做合并的话,可能导致频繁的磁盘IO取数据! 所以他做了如下操作:
在这里插入图片描述
临时文件做排序的图解如下:
在这里插入图片描述
如果上图不好理解,可以看下面这个图,更详细地解释了如果用做临时文件加合并的:
在这里插入图片描述

如果分区文件比较少,就不做排序+合并了,直接落盘让下游过来取
在这里插入图片描述
接着看需要在map后做聚合的情况:
在这里插入图片描述
可以看到所谓的聚合,必定会经历分组+ 聚合2个操作。看下是如何做mergeValue的:
在这里插入图片描述

Q:这里为什么要做采样来判断内存是否会超呢?
A:因为当前聚合操作都是在内存中进行的,而map的数据是一块块计算出来的,如果这个聚合的key取的有问题,导致分组聚合后的数据总大小几乎没变化,就可能导致内存里分组后的那堆数据越积越大。

Q:为什么要用采样队列来预测聚合过程的大小?
A:书里没讲原因,个人理解,如果map的块数比较多,这种采样预测的方式是比较简单且消耗计算少的, 根据这个采样大小,后面会用于做内存预估。


接着继续看下怎么做mergeValue的细节:
在这里插入图片描述
其实就是非常常见的一个哈希表计算的过程。
为什么中间那个分支里, 要叠加2pos+1的value值呢?
这是因为为了节省空间,把key和value两两放在一起,如图所示:
在这里插入图片描述
另外每次新增key的时候,会需要扩容,看下扩容怎么实现的:
在这里插入图片描述
可以看到非常简单的一种 每次
2后,复制扩容的过程。


spark做map过程时的计算缓存结果完整图解如下:
在这里插入图片描述
小结一下map后做缓存处理的要点:

  1. 如果map后发给下游时是需要做聚合的操作,则提前做一下聚合处理。
    聚合处理时有以下特点:
    ①每次聚合计算后,都会采样内存变化大小,方便预测内存情况
    ②使用哈希表做聚合计算
    ③哈希表支持扩容,容量每次为*2
  2. 如果map后不需要做聚合,则会缓存结果(注意聚合后同样也是会缓存,只是结果变成聚合后的了), 但如果发现预测的内存不够了,则会把数据存盘, 存盘时的注意点如下:
    ①分区数比较少,则直接落盘
    ②分区数较多,1个分区1个文件的话太多了,磁盘IO消耗大,因此会把分区排序并合并

[toc]

首先回顾一下memoryStore是做什么的。
他主要是将没有序列化的java对象数组或者序列化的byteBuffer放到内存中。
但是这就涉及到一些内存管理的问题,如果放不下,是不是要放磁盘?什么时候认为放不下?这里会一一解读。

MemoryStore的putIterator#

这个方法是把一堆values的数组内容放入内存中(本质上就是放到Map<blockId, blockEntry>中。
如果发现内存足够,能够申请,则调用putArray把数据写入内存(就是放到map中), 否则就去调用diskStore的接口写入磁盘中。
在这里插入图片描述
这里我先打住,不直接往下讲,而是给自己假设场景,如果是自己在开发计算引擎,写executor里的block缓存,肯定需要思考这个问题:

什么时候认为内存是足够的?#

最简单的一个做法:

  1. 我给每个memoryStore设定一个阈值MaxMemory,
  2. 维护一个值currentMemory, 这个值就是memoryStroe里那个Map<BlockId,memoryEntry>所占的大小。
  3. 然后遍历计算一下输入参数values所占的内存大小 needMemory
  4. 如果needMemory > maxMemory - currentMemory, 则认为内存不足,写入到磁盘。

这个做法相当于直接把整个values大小都计算好之后,如果ok,马上进行写入内存操作。
如果是memoryStore是单线程的模块那ok, 但如果这个putIterator是一个支持多线程写入的模块呢?
当我觉得100M足够,我写入,可能得花10s, 然后另外一个线程也觉得100M足够,也要写入,结果写到一半发现内存不够,就尴尬了。
因此问题变为:

多线程时,如果确保计算的内存量是有效的?#

一种方式,就是每次确定要写入时, 把要写入的这100M的量直接加到currentMemory中。 后面的线程要判断时,直接拿最新的curentMemory判断。
但实际上这个数据并没有真正写入map中, 有可能中间出现写入失败或者线程中断, 那这时候已经被处理过的currentMemory就不好搞了。

所以引入一个概念,叫展开内存unrollMemory。
每个线程都有自己的unrollMemory, 可以理解为该线程 准备 写入到内存中的大小。
因此我们统计剩余可写入内存时, 实际上是等于 MaxMemory - currentMemory - 所有线程unrollMemory总和。

但是我们又不能让线程展开的这个值正好把剩余内存占满,所以会设定一个展开内存总和maxUnrollMemory,替代MaxMemory。
因此此时我这个线程可用的剩余内存space,实际上为
maxUnrollMemory - cyrrentUnrollMemory。
在这里插入图片描述

但问题又来了,如果我们假想的可分配内存比实际剩余内存小,怎么办?如下图:
在这里插入图片描述

一种方式,是发现假想剩余内存小于实际剩余内存时,认为内存不足,把数据写入磁盘。
但有个问题, 假设我需要写入100M, 实际剩余内存是98M, 其实只差了2M, 那为什么不能挤挤呢?只差2M了啊哥!

然而我肯定不能去动其他线程的unrollMemory,毕竟人家都认为自己是ok的准备写入了,你总不能插队吧?如果能动其他线程准备写入的数据,这管理就太复杂了。
因此我们需要去已使用内存MemoryEntry里面找, 找一下是不是有比较小的block块,比如有一个块只有5M, 那我就把这个block块放入磁盘,那么我就可以塞进去了!

解答完上述问题后,再学习memoryStore的内存写入管理机制,就容易多了。

memoryStore完整安全展开流程#

1. 计算需要写入的内存大小,是否需要申请新内存#

在这里插入图片描述
这里的计算不同于上文中提到的直接遍历完之后判断总大小
因为当时传入的是一个迭代器,只能迭代一次,每次迭代时都需要放入vector这个临时存储的列表中,万一超级大,放入vector时超出范围就GG了, 所以它实际时每隔一段就会检查一下是否超出阈值。

2. 计算剩余可用的展开空间#

在这里插入图片描述
spark就是上文提到的这个:
在这里插入图片描述
如果小于实际内存,那么就需要去已分配的内存中找,看下能不能抽一些小朋友去磁盘中。

spark不足时,检查能否抽一些已分配内存区磁盘#

核心方法来自ensureFreeSpace,看下它的实现
在这里插入图片描述
这个过程比较简单,也没做太多优化,不考虑最优情况,否则会有排序的性能问题。
如果发现抽内存也不够用, 那就直接认为不行了。
如果ok,那就认为可行,
在这里插入图片描述

内存足够分配,写入#

在这里插入图片描述
最后会返回一个vector数据
这个vector会拿去做真正的写入操作。
在这里插入图片描述

完整过程:#

在这里插入图片描述

[toc]

local部署模式#

首先spark运行时有4个角色,如下:
在这里插入图片描述
Driver:应用驱动程序, 是spark集群的客户
Master:Spark的主控节点,是spark集群的老板
Worker:Spark的工作节点,是集群的各个节点主管
Executor:Spark的工作进程,由worker监管,负责具体任务的执行

简单local模式运行流程(无集群)#

我们先看下启动任务前, driver和executor之间,简单发生了什么(注意local模式下,executor是在driverApp里面的):
在这里插入图片描述
localBackend可以理解成1个客户端
localActore可以理解成1个消息处理器,在这里处理消息并调用对应对象的方法

可以看到启动前,需要先向集群中心申请资源,足够的时候才调用executor的launTask启动任务。

看下任务启动后,发生了什么:
在这里插入图片描述

可以看到executor会不断更新当前的任务运行状态,并发送出去做状态更新。

local-custer运行流程#

和简单local相比,有如下区别:
在这里插入图片描述
多了master角色和worker角色。
看一下master的创建流程:
在这里插入图片描述
可以看到首先进行了master注册,即告知启动了一个master。
从图中可以看到几个关键点:

  1. 为了可靠性,master要选举出主备
  2. 为了主备能顺利切换,会做信息持久化。
  3. actorSystem会定期发消息通知master做检测。
    即检测的定时器是在actorSytem中启动的,而不是master自身启动的。 (为什么呢,如果as挂了,不是所有的心跳检测都无法进行了吗?)

master对worker的离线检测机制#

看一下master的离线检测机制:
在这里插入图片描述
在这里插入图片描述
可以看到是确认离线后,会设置worker状态为dead,并清理内存。
如果离线时的worker已经是dead状态,则会过一段时间才从检测列表中移除。
几个疑问点:

  1. 为什么发现worker状态为已DEAD时,要过几个周期才把worker从检测列表移除?(个人理解是防止worker恢复?但非dead离线的worker为什么是直接移除)
  2. 发现非DEAD的worker离线时,为什么是要告知driver。(个人理解是告知driver有worker异常,需要等待新的worker注册)

worker挂掉后具体发生了什么,等后面的容错机制会介绍。

worker的启动过程#

在这里插入图片描述
可以看到以下几点:

  1. 每个worker有一个独立的工作目录,用于缓存数据等
  2. 启动后同样需要反过来注册到master。
  3. 会通过metricsSystem定期上报自己的状态给master

worker如何注册到master#

在这里插入图片描述
里面黄色标签可以看到自己当时的疑问
为什么worker要先给自己发心跳,然后再转发给master。
书里没说,个人理解是为了自身能确认心跳线程是否因为异常情况导致中断,所以要先发给自己来确认心跳情况。
在这里插入图片描述
针对这个问题, 其实就是之前提到的, 如果超时了,master会把worker清除, 但后来又收到了心跳,则说明worker没挂,可能只是网络异常,此时会需要worker发送连接请求,重新注册到master

master和worker启动之后#

在这里插入图片描述
可以看到master和worker启动完成后, 会启动一个客户端专门和application通信。

master如何处理regsiterApplication#

在这里插入图片描述
关键点在于:

  1. 注册过程是为了在master这绑定appid和appadress的关系(local-clster是本地集群的,但如果是真实的集群,则会有多个appid,所以需要这个诸恶过程)

完整流程图#

在这里插入图片描述

[toc]

之前看了半本书《深入理解Spark:核心思想与源码分析》, 为什么说是半本,因为看不下去了,这本书太喜欢贴scala代码却不解释,但自己那时候不太了解如何去阅读源码,所以希望借助它学习一下。这里贴一贴学习的过程。


sparkContext初始化的内容#

在这里插入图片描述

一些问题和思考#


Q: SparkConf配置信息来自哪些?
A: spark-submit时spark-default.conf里的配置, spark-submit时写入的命令参数


Q: SparkContext必须指定哪2个内容?
A: 指定s.master部署模式和s.app.name应用名


Q:SparkContext到底是干嘛的?
A:任务执行中所有角色都要从sparkContext中获取一些信息,来进行任务的执行。比如各worker需要知道shufflerManager、blockservice之类的信息。
在这里插入图片描述

[toc]

spark的reduce过程究竟做了什么呢?我们可以看一下:
在这里插入图片描述
先拉取数据,在聚合,最后会调上一章讲map操作时的insertAll方法即缓存结果的方法。
如何做reduce聚合没啥好说的,我们看下他是怎么读取中间计算结果的

1.从BlockManage处获取map任务的状态信息#

首先会试图获取任务的状态信息,以确认当前从哪里读取数据

我们看下是从具体的获取信息过程:
在这里插入图片描述
可以看到有以下关键点:

  1. reduce的上游任务节点状态一开始是不确定的, 如果没有需要自己根据shuffleId去拉取过来。
  2. 取状态信息时,不是马上就去取,而是放入一个fetching队列里取,避免同时发送过多的零碎状态信息请求。
    注意这个时候还没有真正拿到数据,只是获取了数据的情况。

2.使用splitLocalRemoteBlocks方法 ,按照中间结果所在节点划分各个Block#

获取了上游任务节点该map数据的状态,那么接着是划分block,以确认是本地还是远程。 看下是怎么做block划分的:
在这里插入图片描述
有以下特点:

  1. 只有非自己当前节点的数据,才需要去远程获取。
  2. 每次是凑齐了一堆块的数量才去取,而不是频繁地取。

3.创建ShufleBlockFetcherIterator迭代器,从远端节点读取中间计算结果在这里插入图片描述#

接着就是要读取了,看下怎么读的:
在这里插入图片描述
有以下特点:

  1. 发送fetch请求时, 会做随机排序(因为取的数据可能来自多个节点,为了避免总是只取1个节点的,选择了随机取)

取完数据之后,就是调用之前map操作里也调用过的insertAll放去缓存或者先计算再缓存,提供给下游调用。

完整流程如下:
在这里插入图片描述