0%

[toc]

【知乎】Presto 是如何 schedule task 的?
presto查询处理流程-queryexecution提交


presto内部4种计算类型的节点#

  1. source节点,读源数据的节点,负责读取数据、Map阶段的计算任务。分配的个数由SplitManager根据数据决定。

  2. fixed节点,shuffle节点,用于处理reduce任务。比如group by计算,source阶段的数据按照hash发送到fixed节点。分配的个数由3. hash_partition_count这个session参数决定,由于是固定个数,所以对于不同的数据规模采用同样的个数也不是很合适,这也是需要改进的点。

  3. single节点,单节点计算任务,某些计算需要在单一节点计算,比如MergeReduce,output等,需要分配一个节点。

  4. coordinator only,只在coordinator节点计算的任务,一般是meta类操作。

    162390543fb35c9e5eff2ee2370e224eac411724


preosto种的split是什么?
【CSDN】presto-spilt详解


Source Stage调度过程#

由SectionExecutionFactory.createStageScheduler我们可以知道
具体的节点分配split策略的实现类为DynamicSplitPlacementPolicy。跟踪其调用栈可以发现,其具体实现为TopologyAwareNodeSelector.computeAssignments方法,其主要流程如下:

  1. 将所有活跃的工作节点作为候选节点;
  2. 如果分片的节点选择策略是HARD_AFFINITY,即分片只能在特定节点进行访问,则根据分片要求更新候选节点列表;
  3. 如果分片的节点选择策略不是HARD_AFFINITY,则根据节点的网络拓扑,从候选节点中选择和分片偏好节点网络路径最匹配的节点列表来更新候选节点列表;最重要目的是让source节点尽可能靠近数据源
  4. 使用bestNodeSplitCount方法从更新后的候选节点列表中选择最合适的节点来分配分片;

从候选节点列表中选择最优节点的算法TopologyAwareNodeSelector.bestNodeSplitCount的流程如下:

  1. 从候选节点列表中随机选择一个(传入节点列表时已进行随机打散new ResettableRandomizedIterator<>(nodes)),如果该节点已分配的Split尚未达到阈值,则选择该节点;
  2. 如果第1步选择的节点已分配Split达到上限,则选择剩余节点中在当前stage中排队Split最少的节点。

signle和fixed调度过程#

选择FixedCountScheduler策略
可以看到FixedCountScheduler只是在nodePartitionMap.getPartitionToNode返回的固定节点列表上调度任务,在选定的每个节点上相应创建一个任务。

Single和Fixed Stage的节点选择比较简单,均为随机性选择,且调用方法为同一个:NodeSelector.selectRandomNodes。

Single Stage只会随机地从所有存活节点列表中选择一个节点,Fixed Stage会随机选择参数hash_partition_count和max_tasks_per_stage两者的小值数量的节点。

Presto的查询调度本质上就是Split分配到各个节点的过程,每个阶段依据本身所承担的职责,调度方式有所区别,从整体上来说,Split分配节点的方式基本为随机选择的策略,在此基础上尽量保证每个节点处理的Split相对平均。

[toc]

https://cloud.tencent.com/developer/article/1656529
在 Presto 中,我们需要了解一些非常重要的数据结构,例如,Slice,Block 以及 Page,下面将介绍这些数据结构。


数据模型#

presto采取三层表结构:

  • catalog 对应某一类数据源,例如hive的数据,或mysql的数据
  • schema 对应mysql中的数据库
  • table 对应mysql中的表
    8071b1f7aaf4aa7d290a7d247ad6ca400d4ab6ca

presto的存储单元:#

  1. Page: 多行数据的集合,包含多个列的数据,内部仅提供逻辑行,实际以列式存储。
  2. Block:一列数据,根据不同类型的数据,通常采取不同的编码方式,了解这些编码方式,有助于自己的存储系统对接presto。

不同类型的block:#

  1. array类型block,应用于固定宽度的类型,例如int,long,double。block由两部分组成

    boolean valueIsNull[]表示每一行是否有值。

    T values[] 每一行的具体值。

  2. 可变宽度的block,应用于string类数据,由三部分信息组成

  • Slice : 所有行的数据拼接起来的字符串。
  • int offsets[] :每一行数据的起始便宜位置。每一行的长度等于下一行的起始便宜减去当前行的起始便宜。
  • boolean valueIsNull[] 表示某一行是否有值。如果有某一行无值,那么这一行的便宜量等于上一行的偏移量。
  1. 固定宽度的string类型的block,所有行的数据拼接成一长串Slice,每一行的长度固定。
  2. 字典block:对于某些列,distinct值较少,适合使用字典保存。主要有两部分组成:
  • 字典,可以是任意一种类型的block(甚至可以嵌套一个字典block),block中的每一行按照顺序排序编号。
  • int ids[] 表示每一行数据对应的value在字典中的编号。在查找时,首先找到某一行的id,然后到字典中获取真实的值。

Slice#

从用户的角度来看,Slice 是一个对开发人员更友好的虚拟内存,它定义了一组 getter 和 setter 方法,因此我们可以像使用结构化数据一样使用内
Slice 常用来表示一个字符串:
// use it as utf8 encoded string
Slice slice = Slices.utf8Slice(“hello”);
Slice subSlice = SliceUtf8.substring(slice, 1, 2);
我们可以像使用字符串一样使用 Slice,Presto 为什么选择 Slice 而不是 String:
字符串创建代价昂贵(字符串拼接,StringBuilder等)。
Slice 是可变的,而 String 是不可变的,因此当我们需要进行字符串计算时,效率更高。
字符串在内存中编码为 UTF16,而 Slice 使用 UTF8,这样可以提高内存效率。UTF16 最少使用两个字节来表示一个字符,而 UTF8 最少使用一个字节,因此,如果 String 内容主要是 ASCII 字符,则 UTF8 可以节省大量内存。
Slice(在 Presto 中)的另一种用法是表示原始字节(SQL中的 VARBINARY 类型):
// use it as raw bytes
block.getSlice().getBytes()

Block#

由于 Page 由 Block 组成,因此我们首先介绍 Block。Block 可以认为是同一类数据(int,long,Slice等)的数组。每个数据项都有一个 position,总位置个数代表 Block 中数据的总行数(Block 仅保存这些行中的一列)
Block 定义了好几套 API,其中一个是 getXXX 方法,让我们以 getInt 为例:
/**
* Gets a little endian int at {@code offset} in the value at {@code position}.
*/
default int getInt(int position, int offset) {
throw new UnsupportedOperationException(getClass().getName());
}
通常,一个 Block 仅支持一种 getXxx 方法,因为一个 Block 中的数据都来自同一列,并且具有相同的类型。
Block 定义的另一个方法是 copyPositions,来代替从 Block 中获取某个值,通过返回一个新的 Block 来从指定的位置列表获取一组值:

1
2
3
4
5
6
7
8
9
10
11
/**

- Returns a block containing the specified positions.

- All specified positions must be valid for this block.

- The returned block must be a compact representation of the original block.

*/

Block copyPositions(List<Integer> positions);
  • Presto 还定义了 BlockEncoding,定义了如何对 Block 进行序列化和反序列化:
1
2
3
4
5
6
public interface BlockEncoding {
/**
\* Read a block from the specified input. The returned
\* block should begin at the specified position.
*/
Block readBlock(SliceInput input);
1
2
3
4
/**
* Write the specified block to the specified output
*/
void writeBlock(SliceOutput sliceOutput, Block block);

}
我们以最简单的 BlockEncoding:IntArrayBlockEncoding 为例,其 readBlock 如下所示:

1
2
3
4
5
6
7
int positionCount = block.getPositionCount();
sliceOutput.appendInt(positionCount);
encodeNullsAsBits(sliceOutput, block);
for (int position = 0; position < positionCount; position++) {
if (!block.isNull(position)) {
sliceOutput.writeInt(block.getInt(position, 0));
}

Page#

Page 由不同的 Block 组成:

1
2
3
4
5
public class Page {
private final Block[] blocks;
private final int positionCount;
...
}

除 Block 外,Page 还有另一个称为 Channel 的概念:每个 Block 都是该 Page 的 Channel,Block 的总数就是 Channel 数。因此,让我们在这里总结一下数据是如何结构化的,当要发送一些行时,Presto 将:
将每一列放入单独的 Block 中。
将这些 Block 放入一个 Page 中。
发送 Page。
Page 是保存数据并在 Presto 物理执行算子之间传输的数据结构:上游算子通过 getOutput() 产生输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**

- Gets an output page from the operator. If no output data is currently

- available, return null.

*/

Page getOutput();

下游算子通过 addInput() 方法获取输入:

/**

- Adds an input page to the operator. This method will only be called if

- {@code needsInput()} returns true.

*/

void addInput(Page page);
  • 就像 Block 一样,Page 也需要序列化和反序列化,序列化发生在工作进程之间传输数据时。Page 进行序列化时,首先使用相应的 BlockEncoding 对 Block 进行编码。如果有压缩器,将尝试对编码的块数据进行压缩,如果压缩效果良好(编码率低于0.8),将使用压缩数据,否则使用未压缩的数据。编码后的块数据将与一些统计信息(压缩前后页面的字节大小)一起放入名为 SerializedPage 的类中。

[toc]

https://blog.csdn.net/gv7lzb0y87u7c/article/details/81049861?utm_medium=distribute.pc_relevant.none-task-blog-2defaultbaidujs_baidulandingword~default-0.no_search_link&spm=1001.2101.3001.4242
https://blog.csdn.net/sjtuyunlei/article/details/90369926
https://zhuanlan.zhihu.com/p/101366898


presto内存空间分配#

3c99c38c98ef9fe816460b6f44c8f27cbc7d8be3

  • System Pool 是用来保留给系统使用的,默认为40%的内存空间留给系统使用。
  • Reserved Pool和General Pool 是用来分配query运行时内存的。
  • 其中大部分的query使用general Pool。 而最大的一个query,使用Reserved Pool, 所以Reserved Pool的空间等同于一个query在一个机器上运行使用的最大空间大小,默认是10%的空间。
  • General则享有除了System Pool和General Pool之外的其他内存空间。
  • 在真正执行物理计划前,内存需求都来自于systemMemoryPool,包括临时数据结构,传输buffer等
  • 执行物理计划时,不同的Operator类型都根据需要申请内存,比如aggregationOperator使用getEsctimatedSize()方法预估需要的内存。这里获取的内存来自于reservatedMemoryPool或者generalMemoryPool,究竟使用哪个pool取决于当前查询是否耗用内存最大。

为什么要引出一个Reserved内存池且只提供给1个作业使用?#

如果没有Reserved Pool, 那么当query非常多,并且把内存空间几乎快要占完的时候,某一个内存消耗比较大的query开始运行。
但是这时候已经没有内存空间可供这个query运行了,这个query一直处于挂起状态,一直在等待可用的内存。
但是其他的小内存query跑完后, 可能只腾出一点点的空间, 又有新的小内存query加进来。由于小内存query占用内存小,很容易找到可用内存。 这种情况下,大内存query就一直挂起直到饿死。
所以为了防止出现这种饿死的情况,必须预留出来一块空间,共大内存query运行。 预留的空间大小等于query允许使用的最大内存。Presto每秒钟,挑出来一个内存占用最大的query,允许它使用reserved pool,避免一直没有可用内存供该query运行。

保留池选举机制#

c19a007e8eb9e99042e4472b8f07224adc7542ce
Presto内存管理,分两部分:

  1. query内存管理
  • query划分成很多task, 每个task会有一个线程循环获取task的状态,包括task所用内存。汇总成query所用内存。
  • 如果query的汇总内存超过一定大小,则强制终止该query。
  1. 机器内存管理
  • coordinator有一个线程,定时的轮训每台机器,查看当前的机器内存状态。

当query内存和机器内存汇总之后,coordinator会挑选出一个内存使用最大的query,分配给Reserved Pool。
内存管理是由coordinator来管理的, coordinator每秒钟做一次判断,指定某个query在所有的机器上都能使用reserved 内存。

如果某台机器上,,没有运行该query,那岂不是该机器预留的内存浪费了?为什么不在单台机器上挑出来一个最大的task执行?#

原因还是死锁,假如query,在其他机器上享有reserved内存,很快执行结束。但是在某一台机器上不是最大的task(即这个task在另一个节点可能只排第二名,被另一个大作业占了保留池, 导致下一步卡住了,无法连续的执行),一直得不到运行,导致该query无法结束。
所以首要目的是保证 此刻已感知到的最大作业尽快执行完毕。

空间不足时杀query的逻辑#

https://blog.csdn.net/eagooqi/article/details/109595271
每次作业提交存在会话级别的配置
query_max_memory 本次查询规定的最大内存
在轮询过程种如果发现内存超出本次查询上限内存, 会杀掉这个query。
还有个会话配置resource_overcommit
如果设为true,后面即使内存暂时超出单作业规定内存,业不会被杀掉
但如果整个集群的内存出现不足,他仍然会被杀掉
集群内存不足的判定:
存在某个worker节点的内存池出现内存不足(即该节点阻塞了),则认为发生了内存溢出
4aa0a7e4705482811c6e8cd7f04600b2630bef4d
411d7fe11648d3c5e9f2af25d938180a68030c85
fbbd774b6c247a57e2b7ec446e5d2faa5003152b

[toc]

参考资料:
https://www.gameres.com/883796.html
https://zhuanlan.zhihu.com/p/101366898
https://blog.csdn.net/anghiking20140716/article/details/101312055


presto高可用原理图#

4eb71e56e2bac3dfe405310f0acfbede81b37634

presto高可用配置#

在worker的配置中,可以选择配置:

  • discovery的ip:port。

  • 一个http地址,内容是service inventory,包含discovery地址。

  • 一个本地文件地址

    2和3的原理是基于service inventory, worker 会动态监听这个文件,如果有变化,load出最新的配置,指向最新的discovery节点。

    在设计上,discovery和coordinator都是单节点。如果有多个coordinator同时存活,worker 会随机的向其中一个汇报进程和task状态,导致脑裂。调度query时有可能会发生死锁。

    discovery和coordinator可用性设计:

    由于service inventory的使用,监控程序可以在发现discovery挂掉后,修改service inventory中的内容,指向备机的discovery,无缝的完成切换。

    coordiantor的配置必须要在进程启动时指定,同一个集群中无法存活多个coordinator。因此最好的办法是和discovery配置到一台机器。 secondary机器部署备用的discovery和coordinator。在平时,secondary机器是一个只包含一台机器的集群,在primary宕机时,worker的心跳瞬间切换到secondary。


presto高可用解决方案选型#

Presto的高可用主要解决的是Coordinator/Discovery Server的单点问题。

Presto高可用的解决有很多种方案,比如用HAProxy+Keepalived或者云服务商提供的lb这样的外部组件,但问题是这样就需要用VRRP这类虚拟IP来解决单点问题,而我们尽量要做到集群内部组件不依赖外部组件,这样从架构上来说,系统复杂度最低,也最容易维护。
综上,我们需要修改Presto底层源码来解决。
Presto的discovery其实就是airlift discovery,通过阅读源码,发现其内部是支持HA的,只是Presto的官方文档上没有展示出来。


presto on yarn#

presto on yarn是一种动态的运行策略,在yarn上面,哪个节点运行presto的coordinator和worker是不确定的,这会给外部调用presto的程序带来困扰

外部的程序和presto的交流一般是通过presto提供的客户端来调用,而它的客户端需要事先知道presto的coordinator地址,在presto on yarn的情况下,coordinator的地址是不确定的,有可能会发生变化。

这种情况下的处理方案是:将presto的服务发现方案外置,将presto的服务发现服务独立于presto的coordinator运行,将presto的coordinator和worker中的discovery.uri配置成外部独立的发现服务地址,在外部提供具有HA的服务发现,提供稳定的发现服务。

Presto的服务发现是基于airlift的服务发现做的实现,airlift的服务发现可以在这里查看实现和源码,不过它基本是处于无文档的状态,所以理解要多花些功夫。

airlift的服务发现的总体思路是基于http提供一个提供服务发现的HA集群,集群之间通过http通信,通过数据同步方式,提供最终一致性的保证。

看完赛博朋克, 为什么后劲很足, 到了周二还在看相关的衍生视频呢?

因为你会意识到整部作品从主角装上斯维坦因的时候,就已经是注定的悲剧了。

每个人为了不断变强,让自己装上超出身体承受上限的义肢, 是否意味着为了 残酷的竞争,而不断给自己强加的压力呢?

为了缓解压力, 即使不得不吃起精神类药物,也不肯让自己的事业有一丝丝的下落, 仿佛一旦下落,自己努力的意义便不复存在。

然而公司或者资本不会在乎你的压力,当需要你的价值时,就会各种条件去诱惑你,当你的价值榨干完毕,便会立刻抛弃。

即使你想要反抗, 也会发现 像亚当重锤那样的存在, 是你无法企及的,在当前压力永远无法突破的。

男主为了妈妈的梦想,即使已经很拮据了, 也还是强忍着不适凑合上学, 错了吗?

男主为了露西的月球梦,拼命工作,错了吗?

也许让对方能过得幸福,本身也是他们的愿望,但是都在以为对方好的目的互相各自伤害着。

露西故意瞒着大卫,为他排除田中记忆的追查, 却不肯真正地表达,。

大卫怎么做才是对的呢? 他如果发现不能实现对露西的承诺,他会很难受,很痛苦,因为那是他许下的承诺。 露西如果能清晰地表达说: 月球不重要,我已经有了更想要的东西了,是不是更好?

[toc]

Q: kafka为什么那么快?底层原理是什么?#

A:

  • Cache Filesystem Cache PageCache缓存
  • 顺序写 由于现代的操作系统提供了预读和写技术,磁盘的顺序写大多数情况下比随机写内存还要快。
  • Zero-copy 零拷技术减少拷贝次数
  • Batching of Messages 批量量处理。合并小的请求,然后以流的方式进行交互,直顶网络上限。
  • Pull 拉模式 使用拉模式进行消息的获取消费,与消费端处理能力相符。

Q: kafka的message格式是什么样的?#

A:
一个Kafka的Message由一个固定长度的header和一个变长的消息体body组成

  • header部分由一个字节的magic(文件格式)和四个字节的CRC32(用于判断body消息体是否正常)构成。

  • 当magic的值为1的时候,会在magic和crc32之间多一个字节的数据:attributes(保存一些相关属性,

  • 比如是否压缩、压缩格式等等);如果magic的值为0,那么不存在attributes属性

  • body是由N个字节构成的一个消息体,包含了具体的key/value消息

    72e821130dc3997f28f34e6898927a9c7caa73a2

    kafka的消息格式其实具有演变:一文看懂Kafka消息格式的演变


Q: kafka如何实现延迟队列?#

A:
Kafka并没有使用JDK自带的Timer或者DelayQueue来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer)。JDK的Timer和DelayQueue插入和删除操作的平均时间复杂度为O(nlog(n)),并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)。时间轮的应用并非Kafka独有,其应用场景还有很多,在Netty、Akka、Quartz、Zookeeper等组件中都存在时间轮的踪影。
https://blog.csdn.net/u013256816/article/details/80697456

[toc]

Q: 什么是partition的预写式日志?#

A:
Kafka中主题的每个Partition有一个预写式日志文件
每个Partition都由一系列有序的、不可变的消息组成
这些消息被连续的追加到Partition中,Partition中的每个消息都有一个连续的序列号叫做offset, 确定它在分区日志中唯一的位置。
a29e519d65d4c1a270b401545ee4d9e63b504c7f


Kafka每个topic的partition有N个副本,其中N是topic的复制因子
如下图
d68a1db5ac0df3e30fb8292b3a0f0527db563d39



Q: kafka如何实现故障自动转移?#

A:
如果leader发生故障或挂掉,一个新leader被选举并被接受客户端的消息成功写入。
Kafka确保从同步副本列表中选举一个副本为leader。消费者去新的leader进行数据读取。


Q: leader和flower如何同步数据?#

A:
Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。
事实上,同步复制要求所有能工作的follower都复制完,这条消息才会被commit,这种复制方式极大的影响了吞吐率(写入的响应)。
而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入预写log就被认为已经commit,写请求可以返回。 然后让follower异步、被动、定期地去复制leader上的数据


Q: HW和LEO是什么?#

A:
LEO是log end offset, 预写日志最后的偏移
HW是HighWatermark,所有副本的水位线,是leader来确定的。
取一个partition对应的ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置
c31d90b070c371350f63ca038d3911c36a877420


Q: 如果某个follower拉取同步太慢, 远远落后leader,会怎么样?#

A:
follower从leader同步数据有一些延迟。任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。
AR=ISR+OSR。
所有的副本(replicas)统称为Assigned Replicas,即AR
ISR:In-Sync Replicas 副本同步队列


Q: 为什么同步可能会慢?#

A:

  • 慢副本:在一定周期时间内follower不能追赶上leader。最常见的原因之一是I / O瓶颈导致follower追加复制消息速度慢于从leader拉取速度。
  • 卡住副本:在一定周期时间内follower停止从leader拉取请求。follower replica卡住了是由于GC暂停或follower失效或死亡。
  • 新启动副本:当用户给主题增加副本因子时,新的follower不在同步副本列表中,直到他们完全赶上了leader日志

Q: 怎么确定多慢要剔除?#

A:
replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度,
当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR


Q: 详细描述一下当producer生产消息至broker后,ISR以及HW和LEO的流转过程#

A:
9ff6373f13df967a81d9577c7555964d68291fa6


Q: leader怎么知道其他follower的同步进度?#

A:
可以定时检测, follower来拉取时,要携带自己的同步进度偏移的。


Q: ISR怎么反馈和存储?#

A:
Kafka的ISR的管理最终都会反馈到Zookeeper节点上。具体位置为:/brokers/topics/[topic]/partitions/[partition]/state。目前有两个地方会对这个Zookeeper的节点进行维护:

  • Controller来维护:Kafka集群中的其中一个Broker会被选举为Controller,主要负责Partition管理和副本状态管理,也会执行类似于重分配partition之类的管理任务。在符合某些特定条件下,Controller下的LeaderSelector会选举新的leader,ISR和新的leader_epoch及controller_epoch写入Zookeeper的相关节点中。同时发起LeaderAndIsrRequest通知所有的replicas。
  • leader来维护:leader有单独的线程定期检测ISR中follower是否脱离ISR, 如果发现ISR变化,则会将新的ISR的信息返回到Zookeeper的相关节点中。

Q:这种情况下如果follower都还没有复制完,落后于leader时,突然leader宕机,会怎么样?#

A:
两者之间的HW同步有一个间隙,B在同步A中的消息之后需要再一轮的FetchRequest/FetchResponse才能更新自身的HW为5。

  • 如果在更新HW之前,B宕机了,那么B在重启之后会根据之前HW位置进行日志截断,这样便会将4这条消息截断,然后再向A发送请求拉取消息。

  • 此时若A再宕机,那么B就会被选举为新的leader。

    B恢复之后会成为follower,由于follower副本的HW不能比leader副本的HW高,所以还会做一次日志截断,以此将HW调整为4。这样一来4这条数据就丢失了(就算A不能恢复,这条数据也同样丢失了)。

    7ed2df0388480852c93f04f2bf44553f30f249d2

    会丢失一部分数据,但概率比较低, 一般一定有最近的一个follower同步过去。


Q: 从leader的磁盘数据同步到follower,借助了什么机制加快了传输效率?#

A:
充分利用磁盘顺序读以及send file(zero copy)机制,这样极大的提高复制性能
直接从磁盘到网络,避免中间buffer。


Q: 生产者向broker发数据, ack的返回值有什么含义?#

A:

  • 1(默认) 数据发送到Kafka后,经过leader成功接收消息的的确认,就算是发送成功了。在这种情况下,如果leader宕机了,则会丢失数据。
  • 0 生产者将数据发送出去就不管了,不去等待任何返回。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
  • -1 producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。当ISR中所有Replica都向Leader发送ACK时,leader才commit,这时候producer才能认为一个请求中的消息都commit了。

Q: 为什么选择主写读写, 而不是读写分离?#

A:

  1. 数据一致性问题。数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间 窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。
  2. 延时问题。类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经 历网络→主节点内存→网络→从节点内存这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。

参考链接:
https://www.cnblogs.com/aidodoo/p/8873163.html
https://blog.csdn.net/kwame211/article/details/107402104

[toc]

它最初是由LinkedIn公司开发的,之后成为Apache项目的一部分
主要用于处理流式数据。

Q: kafka中有哪些关键角色?#

A:

  • Kafka对消息保存时根据Topic进行归类,Topic可以理解为一个队列
  • 发送消息者称为Producer
  • 消息接受者称为Consumer
  • 此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。
  • 无论是kafka集群,还是consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。
  • Partition:一个topic可以分为多个partition,每个partition是一个有序的队列
  • Offset:kafka的存储文件都是按照offset.kafka来命名。你想找位于2049的位置,只要找到2048.kafka的文件即可
  • ISR:In-Sync Replicas 副本同步队列
  • AR:Assigned Replicas 所有副本

Q: 详细解释下broker?#

A:
Producers(生产者程序)往Brokers里面的指定Topic中写消息
Consumers(消费者程序)从Brokers里面拉取指定Topic的消息,然后进行业务处理,broker在中间起到一个代理保存消息的中转站。


Q: zk在kafka中的作用?#

早期版本的kafka用zk做meta信息存储,consumer的消费状态,group的管理以及 offset的值
zookeeper 在kafka中还用来选举controller 和 检测broker是否存活等等。
Zookeeper保存kafka的集群状态信息的,包括每个broker,为什么?
因为zk和broker建立监听,一旦有一个broker宕机了,另一个备份就可以变为领导,第二,zk保存消费者的消费信息,为什么要保存?就是为了消费者下一次再次消费可以得知offset这个偏移量,consumer信息高版本在本地维护
1


Q: kafka中consumer group 是什么概念#

A:
对于同一个topic,每个group都可以拿到同样的所有数据,但是数据进入group后只能被其中的一个worker消费
worker的数量通常不超过partition的数量,且二者最好保持整数倍关系,因为Kafka在设计时假定了一个partition只能被一个worker消费(同一group内)

  • 同一个消费者组里面不能是同时消费者消费消息,只能有一个消费者去消费
  • 同一个消费者组里面是不会重复消费消息的
  • 同一个消费者组的一个消费者不是以一条一条数据为单元的,是以分区为单元,就相当于消费者和分区建立某种socket,进行传输数据,所以,一旦建立这个关系,这个分区的内容只能是由这个消费者消费

Q: 怎么实现消费者消费不同的数据?#

A: 将消费者放在同一组取数据


Q: 生产环境一般要求消费者消费的数据一样且多个,比如一个写到hdfs,一个放到spark计算,怎么做#

A:
这样就得要求相同数据拷贝,放到不同的消费者组里, 各消费者在不同的消费组里进行消费。


Q: 为什么说kafka是分布式的?#

A:

  • 同一个topic又拥有不同的分区,不同的分区可以分布在不同的borker上也就是不同的机子上
  • 消费者组里的消费者可以在不同的机器上,有什么好处?消费的方式可以是存储可以是计算,如果是放在一台机子上,Io等压力很大
  • kafka上面的所有想到的角色都是分布式的,不管是消费者还是生产者还是分区,他们之间沟通的唯一桥梁就是zookeeper

Q: 项目选型kafka的理由?#

A:
kafka:适合数据下游消费众多的情况;适合数据安全性要求较高的操作,支持replication。为什么适合数据下游消费众多?因为有就算有多个消费者,kafka里面存的数据是一样的,不会再增加副本。
在实际生产应用中,一般会使用kafka做为消息传输的数据管道
rabbitmq做为交易数据做为数据传输管道
主要的取舍因素则是是否存在丢数据的可能;
rabbitmq在金融场景中常用,具备较高的严谨性,数据丢失的可能性更小,同事具有更高的实时性;
而kafka优点主要体如今吞吐量上,虽然能够经过策略实现数据不丢失,但从严谨性角度来说,大不如rabbitmq;并且因为kafka保证每条消息最少送达一次,有较小的几率会出现数据重复发送的状况;

  • 吞吐量较低:Kafka和RabbitMQ都可以。:
  • 吞吐量高:Kafka。

flume 适合多个生产者;适合下游数据消费者不多的情况


Q: kafka一般的应用场景?#

A:
线上数据 --> flume --> kafka --> HDFS:所以一般情况下,企业用flume收集日志数据,然后下游sink选择kafka


架构图如下:
24f762e635f83413edc0b674a62e264556511e1c

[toc]


Q: 单纯的redis主从模式有什么缺点?#

A:
单纯的读写分离,无法应对大规范的请求访问。

  • 读的话可以通过主-从-从-从实现多节点读,但是同步的过程会导致不一致。
  • 主从模式,内存里能存储的缓存大小也有限
  • 只能主节点写, 写能力有限

Q:redis集群,集群中每个节点是存储了相同的数据吗?#

A:
不是的, 采用是类似数据库水平扩容的方式。(主从模式才是数据完全相同)
通过响应的分片算法, 每个redis节点处理的key是不同的,保证每个key映射到唯一的redis节点。


Q:客户端怎么知道数据分布到哪个节点上?难道需要每个客户端自己根据集群信息+分片算法进行计算吗?#

A:
客户端在初始化的时候只需要知道一个节点的地址即可
客户端会先尝试向这个节点执行命令
如果key所在的slot刚好在该节点上,则能够直接执行成功。
如果slot不在该节点,则节点会返回MOVED错误,并告知实际节点ip
换句话说, redis集群中每个节点都知道实际的slot分布,并会告知客户端真实地址进行重定向


Q: 每次都要MOVED,那岂不是每次都多访问了1次?#

A:
当客户端拿到了MOVED信息的响应后,可通过cluster nodes命令获取整个数据分布表,并缓存在客户端内存中。 这样每次就能根据分布表每次请求到正确的节点
一旦数据分布表发生变化,请求到错误的节点,返回MOVED信息后,重新重新执行cluster nodes命令更新数据分布表。


Q: 除了返回MOVED,还有可能可能返回ASK,这2个的区别是什么?#

A:
ASK这种错误是在key对应的slot正在进行数据迁移时产生的。
这时候向slot的原节点访问,如果key在迁移源节点上,则该次命令能直接执行。
如果key不在迁移源节点上,则会返回ASK错误,描述信息会附上迁移目的节点的地址。
客户端这时候要先向迁移目的节点发送ASKING命令,然后执行之前的命令。


Q: 每次客户端代码都要这样自己写一堆逻辑吗?#

A:
这些细节一般都会被redis客户端sdk封装起来,使用者完全感受不到访问的是集群还是单节点。即不需要使用者来写这块的重定向、缓存分布表等逻辑。
类似的有JedisCluster。
09f75652c2947c595051504c1986b159563dde7f


Q: 那redis集群的分片算法是怎样的,哈希后按照节点数量取余么还是?#

A:
Redis-cluster中有16384(即2的14次方)个哈希槽。
每个key通过CRC16校验生成一个数字后, 对16383取模来决定放置哪个槽。
Cluster中的每个节点负责一部分hash槽(hash slot)。
例如节点A负责0到5000, 节点B负责5001-10000,节点C负责10000-16384.


Q: 为什么只有16384个槽?会不会太少了? 一致性哈希算法是2^16的圈长度#

A:
16384个槽时,只需要16k(压缩后2k)来存和传输槽信息
65535时,需要65k(压缩后8k)
而redis作者认为redis集群一般不会有超过1000个master节点。


Q: 如果某个redis节点挂了, 是不是马上就要重新分配哈希槽了?#

A:
不是的,每个redis-cluster节点本身都是主从双活, 即有5个cluster节点的话,就有5个主和5个从(甚至更多从), 来尽可能让哈希槽不要重新分配。


Q: 那么什么情况下,会进行哈希槽的重新分配?#

A:

  • 有新的redis-master节点加入(新增从节点不影响分配)
  • 某组节点要统一下线,即主从都不在了
  • 检测到负载不均匀,需要调整,例如均分成0-5000、5000-10000、10000、16384时, 发现0-5000比其他2个节点的压力多太多,则调小0-5000,分配给其他节点。

Q:新增一个节点时, 扩容的过程是怎么样的?#

A:

  1. 当需要将新节点加入到集群中时, 可以通过管理redis集群的客户端执行“cluster meet 新节点ip:端口”命令,或者通过“redis-trib add node”命令添加新节点

    新添加的节点默认在集群中都是主节点

  2. 客户端向节点B发送状态变更命令,将B的对应slot状态置为IMPORTING

  3. 客户端向节点A发送状态变更命令,将A的对应slot状态置为MIGRATING

  4. 客户端针对A的slot上的所有的key,分别向A发送MIGRATE命令,告知A将对应key的数据迁移到B。

  5. 客户端向集群所有主节点广播槽(数据)全部迁移到了目标节点

    7c5e44925851728deab7eb381408ec94057480a6

  • 注意这个功能是由客户端完成,而非集群内自动完成。
  • redis-trib工具做数据迁移

Q: 刚才提到每个redis集群节点中存了实际的哈希槽分布, 这个是怎么传输的,还是说他们都有全局信息进行自己计算?#

A:
通过gossip协议实现信息交换。

  • 节点每秒会向集群中其他节点发送 ping 消息,消息中带有自己已知的两个节点的地址、槽、状态信息、最后一次通信时间等。(肯定

  • 节点收到 ping 消息后会回复 pong 消息,消息中同样带有自己已知的两个节点信息。

    注意每次消息里只有2个已知的信息。

换句话说, 经过一番杂乱无章的通信,最终所有节点的状态都会达成一致。每个节点可能知道所有其他节点,也可能仅知道几个邻居节点,只要这些节可以通过网络连通,最终他们的状态都是一致的


Q: 但集群最开始的阶段,只有自己的信息,怎么知道给谁发ping消息呢?#

A:
通过MEET消息进行最初的建立。

  • redis的集群管理员给第一个redis集群发送“CLUSTER MEET ip port”命令, 然后就能建立2个之间的连接。
  • 后面逐步添加时,给任意一个节点发送meet即可。 他们会通过之前提到的ping+pong机制乱序地建立关联
  • 可以先建立2个集群, 再通过MEET进行集群连接,合并成一个集群。(有点像并查集)

Q: 刚才只提到了节点加入, 那如果有一个主从节点挂了或者下线, 也是要管理员自己检测然后下线吗?#

A:
节点下线问题不能全依赖管理员,因为一旦因为异常情况主从节点都挂了,会导致对应key不可用,必须尽快响应。

  1. 当节点A ping不通某节点B后,标记为“疑似下线”状态, 会继续用ping命令向集群其他节点同步这个疑似下线的状态
  2. 节点C收到消息后,把B节点纳入到fail_reports链表中。并同样发送FAIL消息给其他节点。
  3. 当越发越多,最终有一半以上的节点收到B节点下线的消息时,节点B将被标记成真正下线。
  4. 标记B下线的这个节点E会直接用FAIL消息进行广播, 而不再慢慢通过ping同步。

Q: E节点怎么知道超过了半数?他难道有全局的视野吗?#

A:
当E受到的来自其他master节点的B:PFAIL消息达到一定数量后,会将B的PFAIL升级为FAIL状态,表示B已确认为故障,后续将会发起slave选举流程
换句话说, 当E收到B的PFAIL后,会维护一个fail_reports链表, 记录一下是谁认为B离线了。
当收到的B离线报告越来越多,且人数超过了一半,E就认为可以宣布B真正离线了


Q: 为什么要一半以上下线,才真正认为下线? 直接广播不行么?#

A: 因为有可能是某个节点自己网络故障,导致不通。
当他发了疑似下线的状态给别的节点时, 别的节点也许能ping通,那么就会覆盖他的疑似下线状态。
所以需要半数选举,如果节点真的挂了, 那么就不会存在状态覆盖,多次ping同步中肯定会超过半数。


Q: 发ping时的原理讲一下?比如怎么选择给谁发ping, 隔多久发一次?#

A:

  • 每x秒从所有已知节点中随机选取5个,向其中上次收到pong最久远的一个发送ping
  • 如果有未建立连接,但是从别人那里收到过信息的节点,则有限ping或者meet一下。

Q: 心跳中包含了哪些内容?#

A:

  1. Header部分,发送者自己的信息
  • 所负责slots的信息
  • 主从信息
  • ip+port信息
  • 自己的状态信息
  1. Gossip,发送者所了解的部分其他节点的信息
  • ping_sent, pong_received
  • ip, port信息
  • 状态信息,比如发送者认为该节点已经不可达,会在状态信息中标记其为PFAIL或FAIL

Q: gossip的同步过程可以看到是很混乱的,万一redis-master节点先收到了新的slot信息后, 再收到了旧的slot信息,怎么办?#

A:
Redis Cluster 使用了类似于 Raft 算法 term(任期)的概念称为 epoch(纪元),用来给事件增加版本号。
每当收到ping里的slots信息, 会判断发送者声明的slots信息,跟本地记录的是否有不同

  • 如果不同,且发送者epoch较大,更新本地记录
  • 如果不同,且发送者epoch小,发送Update信息通知发送者

Q: cluster bus是什么?#

A:
中文名词 集群总线。其实就是特定的集群端口,专用于gossip协议交互。
Redis Cluster Bus通过单独的端口进行连接
bus是节点间的内部通信机制,交互的是字节序列化信息,而不是client到Redis服务器的字符序列化以提升交互效率。


Q: redis集群中可以使用publish吗?为什么?#

A:

  • 在集群模式下,所有的publish命令都会向所有节点(包括从节点)进行广播
  • 这会造成每条publish数据都会在集群内所有节点传播一次,加重了带宽负担

换言之, 如果要用到publish操作,建议换一个redis集群专门做publish, 不要去占用做缓存作用的redis集群带宽。

[toc]


Q: 主节点如果挂了,就无法响应写操作了,那么redis如何检测到主节点离线或者挂了?#

A:

  1. redis有多个哨兵进程,共同监控主从节点的状态。
  2. 当某个哨兵发现主节点连不上了,则会进行“主观下线”,即他个人认为主节点掉了。
  3. 接着这个最先发现主节点掉线的哨兵,会马上发送is-master-down-by-addr给其他哨兵, 让其他哨兵帮忙确认一下。
  4. 其他哨兵同时做一下检测,确认不通时,就会给出赞同票。
  5. 当赞同票大于quorum 配置,则认为主节点是“客观下线”了

Q: 讲了那么多, 什么是哨兵啊?是某个redis进程里带的一个独立模块吗?#

A:

  • Sentinel 其实是一个 redis 服务端程序,只不过运行在特殊的模式下,不提供数据存储服务,只进行普通 redis 节点监控管理。
  • 它也会定时执行 serverCron 函数,只是里面其他的程序用不到,用到的是对普通 redis 节点的监控以及故障转移模块。

Q: 为什么要用多个哨兵进程进行监控,而不用一个监控进行监控?#

A:

  • 一个监控程序不可靠, 如果1个监控进程挂了,那么检测机制就失效了
  • 一个监控程序有一个特定的检查间隔, 而哨兵机制可以多个分时间段进行检测。当最先发现下线的人,触发投票过程即可

Q:哨兵进程怎么知道其他哨兵进程的存在?#

A:
在主从集群中,主库上有一个名为__sentinel__:hello的频道,不同哨兵就是通过它来相互发现,实现互相通信的

  1. 哨兵 1 把自己的 IP(172.16.19.3)和端口(26579)发布到__sentinel__:hello频道上

  2. 哨兵 2 和 3 订阅了该频道。那么此时,哨兵 2 和 3 就可以从这个频道直接获取哨兵 1 的 IP 地址和端口号。

  3. 然后,哨兵 2、3 可以和哨兵 1 建立网络连接。

    ef6dc1286e092b2616025b31d24c869762345209


Q: 哨兵怎么对主节点继续检测的?#

A:
由哨兵向主库发送 INFO 命令来完成的
,哨兵 2 给主库发送 INFO 命令,主库接受到这个命令后,就会把从库列表返回给哨兵,实时更新。


Q: 当确认主节点客观下线后, 由哪个哨兵来负责通知客户端或者应用服务进行主节点ip的切换?#

A:
需要选一个主的哨兵节点。
Raft选举算法: 选举的票数大于等于num(sentinels)/2+1时,将成为领导者,如果没有超过,继续选举


Q: 什么情况下无法完成主从切换?#

A:
哨兵进程挂掉太多。
假设有5个哨兵进程, 1个哨兵必须拿到超过多数的选票(5/2+1=3票)。
但目前只有2个哨兵活着,无论怎么投票,一个哨兵最多只能拿到2票,永远无法达到N/2+1选票的结果。因此无法完成主从切换。


Q: 从库那么多,怎么选择一个新的主库?#

A:

  1. 剔除和其他哨兵不通的从节点
  2. 选择salve-priority配置中优先级最高的从节点
  3. 选择偏移复制量最大的,即复制最完整的。

Q: 选定主库后, 故障转移的流程是怎么样的?#

A:

  1. 主哨兵通知新主节点, 让他把自己升级主节点,不再需要同步别人的数据。
  2. 通知其他的从节点更新主节点ip
  3. 通知客户端或者应用服务,主节点已经更换。
  4. 原先掉线的主节点如果上线了,会被通知成为从节点。