Kafka技术内幕

购买链接
Kafka入门.pdf — gitee

Kafka 流式数据平台

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 0.10 之前 => MQ
Kafka 消息系统:应用解耦、异步解耦、异步消息、流量削峰等

# 0.10 之后 => MQ + Stream
Kafka 提供了连接器与流处理能力,分布式消息系统 => 流式的数据平台

# 流式数据平台 => 3个特点
1. 类似消息系统,事件流的发布和订阅(数据注入)
2. 储存事件流数据,并可以故障容错(数据储存)
3. 实时事件的流式处理和分析(流处理)

# Kafka 是如何实现上面三个功能的?
# 消息系统
=> 1p->多c,1p->多g
=> 也叫作消息队列,简称 MQ
=> Kafka 使用消息组(consumer group)统一了两种消息模型(见下图)
=> 多个消费组结合多个消费者,既可扩展消息处理能力,也可允许消息被多个消费组订阅
1. 队列模式(也叫点对点模式)
消息平均分配给消费组中的消费者
=> 多个消费组读取消息队列,每条消息只发给一个消费者(相同消费者组)
2. 发布/订阅模式(pub/sub)
消息广播给多个消费组
=> 多个消费者订阅主题,主题的每条记录会发布给所有消费者(不同消费者组)

# 存储系统
发布、消费要解耦,要存储未消费的数据。否则,在内存中,鸡掰了,消息就丢失了。
Kafka 在写入集群的服务器节点时,还会复制多份,保证高可用。
为了可靠存储,允许生产者收到应答之前,一直阻塞,直到完全复制多个节点,才认为写入成功。

# 流处理系统
流式数据平台仅仅有消息的写入,读取,储存还不行,还要有实时的流式处理能力。
简单的,可以直接使用 Kafka 生产者和消费者 API 完成。
复杂的,可以使用 Kafka Streams 的流处理 API,比如:聚合,连接,各种转换。
它还会解决:乱序、延迟、重输入、窗口、状态、等

# 将上面三个系统组合在一起 => Kafka 流式数据平台
传统消息系统通常只会处理订阅后的消息,无法处理订阅前的历史数据。
分布式文件存储系统,一般存静态历史数据,对历史数据一般采用批处理。
Kafka 既能处理实时数据,也可处理历史数据。
Kafka 流式处理的 4种核心组件(见下图):
1. 生产者(producer)=> 客户端 发布事件流到 Kafka的一个或多个 Topic
2. 消费者(consumer)=> 客户端 订阅 Kafka的一个或多个 Topic,并处理事件流
3. 连接器(connector)=> 将 Topic 和已有数据源连接,数据可以相互导入、导出
4. 流处理(processor)=> 从 Kafka 输入Topic 消费,经过处理,再发送到 输出Topic(in -> map -> out)
Kafka 的流式数据管道,不仅低延迟,还保证数据存储的可靠性。
与离线系统集成,将 Kafka的数据加载到批处理系统,保证数据不遗漏。
Kafka 集群某些节点鸡掰,集群仍是可用的。

在这里插入图片描述
在这里插入图片描述

Kafka 的基本概念

1
2
3
4
5
6
7
8
9
10
11
下面从 3个角度分析 Kafka的几个基本概念:

# 1. Kafka Topic、Partition 内部如何存储?有什么特点?
=> 1t -> 多p,1主多从,不同节点,(时间顺序,从0开始,唯一)

# 2. Kafka的消费模型与传统 MQ相比,有什么特点?
=> 一些 MQ => 立即删除
=> Kafka => 过期删除(日志),pull consumer记录状态:消费旧的、跳到当前

# 3. Kafka 如何实现分布式 存储与读取?
=> 1t多p,p c均衡,p最小并行

分区模型

1
2
3
4
5
6
7
8
9
10
11
12
13
集群 => 多节点(broker server)
消息 => topic 分类(不同类型消息 => 不同 topic)
=> 1t多c(同组负载均衡,不同组广播)。

1t多p(partition日志文件)
p => 有序的、不可变的记录序列,新消息追加到日志。
offset => 按时间单调递增,唯一定位分区中的每一条消息

如下图 1-3 (左)所示,主题有 3 个分区,每个分区的偏移量都从 0 开始,不同分区之间的偏移量都是独立的,不会互相影响。
右图中,发布到Kafka主题的每条消息包括键值和时间戳。
消息到达服务端的指定分区后,都会分配到一个自增的偏移量。
原消息、offset、元数据 => 存储分区日志文件
消息不设置键 => 均衡到不同的分区

在这里插入图片描述

1
2
3
4
5
6
7
8
# 传统 MQ 顺序消费 =====> 1consumer -> 1topic
解决消息顺序消费:一个消费者只消费一个队列
多个消费者消费一个队列,服务端会以消息存储的顺序依次发送给消费者,由于消息是异步发送给消费者的,消息到达消费者的顺序可能是无序的。

# Kafka 顺序消费 =====> 多consumer(同组) -> 1topic(多partition)
partition => 消息处理的并行单元
1p => 1c(分区有序,消费者就有序)
topic 多 partition,不同 consumer 消费不同 partition,有序 && 消费者负载均衡 rebalance。

消费模型

消息由生产者发布到 Kafka 集群后,会被消费者消费,消费模型有两种:
在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# push 推送模型
Server 记录消费状态 ===> 推给 consumer,标记已消费。
存在问题:推出去后,consumer 未消费(消费进程挂了、网络原因未收到),Server 标记为已消费,这样就出问题了。
解决办法:发出去,标记“已发送”,收到消费者确认后标记“已消费”。===> (这样不太好,Server 要记录所有消息的消费状态)

# pull 拉取模型
Consumer 记录消费状态 ===> 每个 Consumer 互相独立地顺序读取每个 partition的消息。
如下图 1-4所示,有两个消费者(不同消费组)拉取同一个主题的消息,消费者A的消费进度是3,消费者B的消费者进度是6。
消费者拉取的最大上限通过最高水位( watermark )控制,生产者最新写入的消息如果还没有达到备份数量,对消费者是不可见的。
这种由 Consumer 控制偏移量的优点:消费者可以按照任意的顺序消费消息。
例如:
1. 消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;
2. 直接跳到最近的位置,从当前时刻开始消费。


# 一些 MQ => 立即删除
Server 会在消息被消费后立即删除消息。
如果不同组的 Consumer 消费同一个 Topic,Server 可能要冗余储存消息;或等所有消费者消费完才删除,这就需要 Server来跟踪每个 Consumer的消费状态,这样会限制吞吐量和处理延迟。

# Kafka => 过期删除(日志)
生产者发布的消息会一直保存在Kafka集群,不论是否被消费。
用户可以设置保留时间来清理过期的数据(默认7天)。
例如:设置保留策略为2天。消息发布后,它可以被不同 Consumer消费,在2天后,过期的消息会被自动清理。

在这里插入图片描述

分布式模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 鸡掰 => p 多副本(1主多从)
# 主读写,从仅同步;主鸡掰,从选主;Server有主也有从;
为了故障容错,每个 partition以副本的形式复制到多个 Server节点。
1个节点主副本(Leader),其他节点从副本(Follower)。
Leader负责读写,Follower仅仅从 Leader同步数据。
Leader鸡掰,会从 Follower中选一个作为 Leader。
=> 因为只有 Leader可以读写,所以每个 Server 持有一些分区 Follower 和 一些分区的 Leader,这样 Kafka集群对 Client来说是负载均衡的。

# producer 无key,轮询(负载均衡)
# producer 有key,同分区
Kafka 生产者和 消费者相对于服务端而言都是客户端,生产者客户端发布消息到服务端的指定主题,会指定消息所属的分区。
生产者发布消息时根据消息是否有键,采用不同的分区策略。
消息没有键时,通过轮询方式进行客户端负载均衡;
消息有键时,根据分区语义确保相同键的消息总是发送到同一个分区。

# consumer 同g => 负载均衡(队列)
# consumer 不同g => 广播(发布/订阅)
consumer 订阅 topic 消费消息,并且每个 consumer 都有一个消费组名称。
生产者发布到 topic 中的每一条消息都只会发给消费组的一个 consumer。
“队列”模型:每个 consumer 都有相同消费组名称,消息会负载均衡到所有 consumer。
“发布/订阅”模型:每个 consumer 消费组名称都不同,每条消息广播所有消费者。

# p最小并行;Server↑ QPS↑
分区是消费者线程模型的最小并行单位。
如图 1-5 (左)所示,生产者发布消息到一台服务器的3个分区时,只有一个消费者消费所有的3个分区。
在图 1-5 (右)中,3个分区分布在 3台服务器上,同时有 3个消费者分别消费不同的分区。
假设每个服务器的吞吐量是300MB ,在图 1-5 (左)中分摊到每个分区只有 100MB,而在图 1-5 (右)中集群整体的吞吐量有 900MB。
可以看到,增加服务器节点会提升集群的性能,增加消费者数量会提升处理性。

在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 同组多消费者 => LB => 所有分区
# consumer 加入/离开 => rebalance
同一个消费组下多个消费者互相协调消费工作, Kafka会将所有的分区平均地分配给所有的消费者实例,这样每个消费者都可以分配到数量均等的分区。
Kafka 的消费组管理会动态地维护 consumer列表,当 consumer加入组,或 consumer离开组,都会触发再平衡操作(rebalance)。

# 一分区有序,多分区无序;
# 完全有序:1partition->1consumer
Kafka 的消费者消费消息时,只保证在一个分区内消息的完全有序性,并不保证同一个主题中多个分区的消息顺序。
而且,消费者读取一个分区消息的顺序和生产者写入到这个分区的顺序是一致的。
比如,生产者写入“hello ”和“kafka ”两条消息到分区P1,则消费者读取到的顺序也一定是“hello”和“kafka。
如果业务上需要保证所有消息完全一致,只能通过设置一个分区完成,但这种做法的缺点是最多只能有一个消费者进行消费。
一般来说,只需要保证每个分区的有序性,再对消息加上键来保证相同键的所有消息落入同一个分区,就可以满足绝大多数的应用。

上面从【宏观角度】分析了 Kafka 基本模型,下面分析Kafka在【底层实现】上的一些设计细节与考虑。

Kafka 的设计与实现

1
2
3
4
5
6
7
下面我们会从 3个角度分析 Kafka的一些设计思路,井尝试回答下面 3个问题。

1. 如何利用操作系统的优化技术来高效地持久化日志文件 和加快数据传输效率?

2. Kafka 生产者如何批量地发送消息,消费者采用拉取模型带来的优点都有哪些?

3. Kafka 的副本机制如何工作,当故障发生时,怎么确保数据不会丢失?

fs 持久化与传输

文件系统的持久化与数据传输效率

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 磁盘访问优化
# 预读:大磁盘块读入内存
# 后写:多小逻辑写 => 大物理写
# 磁盘缓存:磁盘写 => 磁盘缓存
人们普遍认为一旦涉及磁盘的访问,读写的性能就严重下降。
实际上,现代的操作系统针对磁盘的读写已经做了一些优化方案来加快磁盘的访问速度。
比如
预读( read-ahead )会提前将一个比较大的磁盘块读入内存。
后写( write-behind )会将很多小的逻辑写操作合并起来组合成一个大的物理写操作。
操作系统还会将主内存剩余的所有空闲内存空间都用作磁盘缓存(disk cache/page cache),所有的磁盘读写操作都会经过统一的磁盘缓存(除了直接I/O会绕过磁盘缓存)。
综合这几点优化特点,如果是针对磁盘的顺序访问,某些情况下它可能比随机的内存访问都要快,甚至可以和网络的速度相差无几。

# 数据落盘
# 一般做法:多放内存 => 必要时落盘
# Kafka做法:先磁盘缓存 => 定期自动落盘
如图 1-6 (左)所示,应用程序写人数据到文件系统
一般做法是在内存中保存尽可能多的数据,并在需要时将这些数据刷新到文件系统。
但这里我们要做完全相反的事情,右图中所有的数据都立即写入文件系统的持久化日志文件,但不进行刷新数据的任何调用。
数据会首先被传输到磁盘缓存,操作系统随后会将这些数据定期自动刷新到物理磁盘。

在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 生产 -> Server -> 消费(传输效率决定性能)
# × 一次一条(一次网络请求)
# √ 一次一批、压缩(减少网络请求) => 多条按照分区分组
消息系统内的消息从生产者保存到服务端,消费者再从服务端读取出来,数据的传输效率决定了生产者和消费者的性能。
生产者如果每发送一条消息都直接通过网络发送到服务端,势必会造成过多的网络请求。
如果我们能够将多条消息按照分区进行分组,并采用批量的方式一次发送一个消息集,并且对消息集进行压缩,就可以减少网络传输的带宽,进一步提高数据的传输效率。

# 零拷贝技术 -> 减少复制次数
消费者要读取服务端的数据,需要将服务端的磁盘文件通过网络发送到消费者进程,而网络发送通常涉及不同的网络节点。
如图 1-7 (左)所示,传统读取磁盘文件的数据在每次发送到网络时,都需要将页面缓存先保存到用户缓存,然后在读取消息时再将其复制到内核空间,具体步骤如下。
1. (磁盘 -> 内核)操作系统将数据从磁盘中读取文件到内核空间里的页面缓存。
2. (内核 -> 用户)应用程序将数据从内核空间读入用户空间的缓冲区。
3. (用户 -> 内核)应用程序将读到的数据写回内核空间并放入socket缓冲区。
4. (内核 -> 网卡)操作系统将数据从socket缓冲区复制到网卡接口,此时数据才能通过网络发送出去。
结合Kafka 的消息有多个订阅者的使用场景,生产者发布的消息一般会被不同的消费者消费多次。
如图 1-7 (右)所示,使用“零拷贝技术”( zero-copy )只需将磁盘文件的数据复制到页面缓存中1次,
然后将数据从页面缓存直接发送到网络中(发送给不同的使用者时,都可以重复使用同一个页面缓存),避免了重复的复制操作。
这样,消息使用的速度基本上等同于网络连接的速度了。

# 案例:传统40次 -> 零拷贝11次
这里我们用一个示例来对比传统的数据复制和“零拷贝技术”这两种方案。
假设有 10个消费者,传统复制方式的数据复制次数是4x10=40次,而“零拷贝技术”只需1+10=11次
(一次表示从磁盘复制到页面缓存,另外10次表示 10个消费者各自读取一次页面缓存)。
显然,“零拷贝技术”比传统复制方式需要的复制次数更少(越少的数据复制,就越能更快地读取到数据;延迟越少,消费者的性能就越好)。

在这里插入图片描述

生产者与消费者

1
2
3
4
5
6
7
8
9
10
11
# producer 请求元数据 -> 消息 -> leader所在节点 
Kafka 生产者将消息直接发送给分区主副本所在的消息代理节点,并不需要经过任何的中间路由层。
为了做到这一点,所有消息代理节点都会保存一份相同的元数据,这份元数据记录了每个主题分区对应的主副本节点。
生产者客户端在发送消息之前,会向任意一个代理节点请求元数据,井确定每条消息对应的目标节点,然后把消息直接发送给对应的目标节点。

# 两种方式发送:1、随机 2、KEY 同分区
如图1-8所示,生产者客户端有两种方式决定发布的消息归属于哪个分区:
1. 通过随机方式将请求负载到不同的消息代理节点(图 1-8左图),
2. 或者使用“分区语义函数”将相同键的所有消息发布到同一个分区(图 1-8 右图)。
对于分区语义, Kafka暴露了一个接口,允许用户指定消息的键如何参与分区。
比如,我们可以将用户编号作为消息的键,因为对相同用户编号散列后的值是罔定的,所以对应的分区也是固定的。

在这里插入图片描述

1
2
3
4
5
6
7
8
9
# 批量发送:达到时间发送 or 达到大小发送
上面我们讲到,生产者采用批量发送消息集的方式解决了网络请求过多的问题。
生产者会尝试在内存中收集足够数据,并在一个请求中一次性发送一批数据。
另外,我们还可以为生产者客户端设置“在指定的时间内收集不超过指定数量的消息”。
比如,设置消息大小上限等于64字节,延迟时间等于100毫秒,表示:
1. 在100毫秒内消息大小达到64字节要立即发送;
2. 如果在100毫秒时还没达到64字节,也要把已经收集的消息发送出去。
客户端采用这种缓冲机制,在发送消息前会收集尽可能多的数据,通过每次牺牲一点点额外的延迟来换取更高的吞吐量。
相应地,服务端的I/O消耗也会大大降低。

1
2
3
4
5
6
7
8
9
10
11
12
# 推 => 不同消费者速率不同
如图 1-9所示,消费者读取消息有两种方式。
第一种是消息代理主动地“推送”消息给下游的消费者(图 1-9左图),由消息代理控制数据传输的速率,但是消息代理对下游消费者是否能及时处理不得而知。
如果生产快消费慢,会消息积压得越来越多。
而且,推送方式也难以应付不同类型的消费者,因为不同消费者的消费速率不一定都相同,
消息代理要调整不同消费者的传输速率,并让每个消费者充分利用系统的资源。这种方式实现起来比较困难。

# 拉 => 按照自己能力拉取
第二种读取方式是消费者从消息代理主动地“拉取”数据(见图 1-9右图),
消息代理是无状态的,它不需要标记哪些消息被消费者处理过,也不需要保证一条消息只会被一个消费者处理。
而且,不同的消费者可以按照自己最大的处理能力来拉取数据,即使有时候某个消费者的处理速度稍微落后,
它也不会影响其他的消费者,并且在这个消费者恢复处理速度后,仍然可以追赶之前落后的数据。

在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 推 => 状态维护麻烦(“已发送”、“已消费”)
因为消息系统不能作为严格意义上的数据库,所以保存在消息系统中的数据,在不用之后应该及时地删除掉并释放磁盘空间。
消息需要删除,其原因一般是消息被消费之后不会再使用了,大多数消息系统会在消息代理记录关于消息是否已经被消费过的状态:
当消息从消息代理发送给消费者时(基于推送模型),消息代理会在本地记录这条消息“已经被消费过了”。
但如果消费者没能处理这条消息(比如由于网络原因、请求超时或消费者挂掉),就会导致“消息丢失”。
解决消息丢失的一种办法是添加应答机制,消息代理在发送完消息后只把消息标记为“已发送”,只有收到消费者返回的应答信息才表示“己消费”。
但还是会存在一个问题:消费者处理完消息就失败了,导致应答没有返回给消息代理,这样消息代理又会重新发送消息,导致消息被重复处理。
这种方案还有一个缺点:消息代理需要保存每条消息的多种状态(比如,消息状态为“已发送”时,消息代理需要锁住这条消息,保证消息不会发送两次),这种方式需要在客户端和服务端做一些复杂的状态一致性保证。

# Kafka消费者 => 分区消费进度(offset)
Kafka采用了基于拉取模型的消费状态处理,它将主题分成多个有序的分区,任何时刻每个分区都只被一个消费者使用。
并且,消费者会记录每个分区的消费进度( 即偏移量)。
每个消费者只需要为每个分区记录一个整数值,而不需要像其他消息系统那样记录每条消息的状态。
假设有 1万条消息,传统方式需要记录 1万条消息的状态;
如果用Kafka的分区机制,假设有 10个分区,每个分区 1千条消息,总共只需要记录 10分区的消费状态(需要保存的状态数据少了很多,而且也没有了锁)。

# 传统MQ => 消费者ACK,Server做处理
# Kafka消费者 => 定时保存分区消费进度(可回到旧位置重新处理)
和传统方式需要跟踪每条消息的应答不同, Kafka的消费者会定时地将分区的消费进度保存成检查点文件,表示“这个位置之前的消息都已经被消费过了”。
传统方式需要消费者发送每条消息的应答,服务端再对应答做出不同的处理;
而Kafka 需要让消费者记录消费进度,服务端不需要记录消息的任何状态。
除此之外,让消费者记录分区的消费进度还有一个好处:消费者可以“故意”回退到某个旧的偏移量位置,然后重新处理数据。
虽然这种处理方式看起来违反了队列模型的规定(一条消息发送给队列的一个消费者之后,就不会被其他消费者再次处理),但在实际运用中,很多消费者都需要这种功能。
比如,消费者的处理逻辑代码出现了问题,在部署并启动消费者后,需要处理之前的消息并重新计算。


# 生产者批发:消费者批拉
# 拉缺点:空轮询 => 阻塞式、长轮询
和生产者采用批量发送消息类似,消费者拉取消息也可以一次拉取一批消息。
消费者客户端拉取消息,然后处理这一批消息,这个过程一般套在一个死循环里,表示消费者永远处于消费消息的状态(因为消息系统的消息总是一直产生数据,所以消费者也要一直消费消息)。
消费者采用拉取方式消费消息有一个缺点:如果消息代理没有数据或者数据量很少,消费者可能需要不断地轮询,并等待新数据的到来
(拉取模式主动权在消费者手里,但是消费者并不知道消息代理有没有新的数据;如果是推送模式,只有新数据产生时,消息代理才会发送数据给消费者,就不存在这种问题)。
解决这个问题的方案是:允许消费者的拉取请求以阻塞式、长轮询的方式等待,直到有新的数据到来。
我们可以为消费者客户端设置“指定的字节数量”,表示消息代理在还没有收集足够的数据时,客户端的拉取请求就不会立即返回。

副本机制和容错处理

1
2
3
4
5
6
7
8
9
10
11
12
13
# 1主*从   => 分区日志复制
Kafka的副本机制会在多个服务端节点(简称节点,即消息代理节点)上对每个主题分区的日志进行复制。
当集群中的某个节点出现故障时,访问故障节点的请求会被转移到其他正常节点的副本上。
副本的单位是主题的分区,Kafka每个主题的每个分区都有一个主副本以及0个或多个备份副本。
备份副本会保持和主副本的数据同步,用来在主副本失效时替换为主副本。

# 读写 => 主
# 同分区:热点
# 不同分区:负载(有主也有从)
如图 1-10所示,所有的读写请求总是路由到分区的主副本。
虽然生产者可以通过负载均衡策略将消息分配到不同的分区,但如果这些分区的主副本都在同一个服务器上(见图 1-10左图),就会存在数据热点问题。
因此,分区的主副本应该均匀地分配到各个服务器上(见图 1-10右图)。
通常,分区的数量要比服务器多很多,所以每个服务器都可以成为一些分区的主副本,也能同时成为一些分区的备份副本。

在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 从 => 与主同步
# 从(consumer) => 消费主分区,消息持久化到分区日志
备份副本始终尽量保持与主副本的数据同步。
备份副本的日志文件和主副本的日志总是相同的,它们都有相同的偏移量和相同顺序的消息。
备份副本从主副本消费消息的方式和普通的消费者一样,只不过备份副本会将消息运用到自己的本地日志文件(备份副本和主副本都在服务端,它们都会将收到的分区数据持久化成日志文件)。
普通的消费者客户端拉取到消息后并不会持久化,而是直接处理。

分布式系统故障容错时,需要明确地定义节点是否处于存活状态。Kafka对节点的存活定义有两个条件:
1. 节点必须和ZK保持会话;
2. 如果这个节点是某个分区的备份副本,它必须对分区主副本的写操作进行复制,并且复制的进度不能落后太多

# ISR
# 挂了、无响应、落后多 => 移除
# 重新恢复 => 加入
满足这两个条件,叫作“正在同步中”( in-sync )。
每个分区的主副本会跟踪正在同步中的备份副本节点( In Sync Replicas ,即ISR )。
如果一个备份副本挂掉、没有响应 或落后太多,主副本就会将其从同步副本集合中移除。
反之,如果备份副本重新赶上主副本,它就会加入到主副本的同步集合中。

# 提交 => 可见
# 未提交 => 不可见
在Kafka中,一条消息只有被ISR集合的所有副本都运用到本地的日志文件,才会认为消息被成功提交了。
任何时刻,只要ISR至少有一个副本是存活的,Kafka就可以保证“一条消息一旦被提交,就不会丢失”。
只有已经提交的消息才能被消费者消费,因此消费者不用担心会看到因为主副本失败而丢失的消息。
下面我们举例分析Kafka的消息提交机制如何保证消费者看到的数据是一致的。
+ 生产者发布了 10条消息,但都还没有提交(没有完全复制到ISR中的所有副本)。
如果没有提交机制,消息写到主副本的节点就对消费者立即可见,即消费者可以立即看到这 10条消息。
但之后主副本挂掉了,这 10条消息实际上就丢失了。
而消费者之前能看到这 10条丢失的数据,在主副本挂掉后就看不到了,导致消费者看到的数据出现了不一致。

+ 如果有提交机制的保证,并且生产者发布的 10条消息还没有提交,则对消费者不可见。
即使这 10条消息都已经写入主副本,但是它们在还没有来得及复制到其他备份副本之前,主副本就挂掉了。
那么,这 10条消息就不算写入成功,生产者会重新发送这 10条消息。
当这 10条消息成功地复制到ISR的所有副本后,它们才会认为是提交的,即对消费者才是可见的。
在这之后,即使主副本挂掉了也没有关系,因为原先消费者能看到主副本的 10条消息,在新的主副本上也能看到这10条消息,不会出现不一致的情况。

下面我们开始做一些简单的实验,通过观察结果来更形象地理解Kafka的一些基本概念。

快速开始

快速开始—Kafka官方文档

1
2
3
4
5
tar xf kafka_2.13-3.2.1.tgz
cd kafka_2.13-3.2.1

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

单机模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 创建 topic test01(1副本1分区)
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test01
Created topic test01.

# 查看 topic 列表
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
test01

# 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test01
>first
>second
>^C

# 消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test01 --from-beginning
first
second
^CProcessed a total of 2 messages

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# 创建 topic test02(1副本3分区)
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic test02
Created topic test02.

# 查看详情 topic => test01(1副本1分区) test02(1副本3分区)
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test01
Topic: test01 TopicId: tQcnLAv6QpSspbEyiNlg7g PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test01 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test02
Topic: test02 TopicId: jhuQ6ToVR3ycUATHEoeo9Q PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test02 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test02 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test02 Partition: 2 Leader: 0 Replicas: 0 Isr: 0

# 查看kafka日志目录(log.dir配置)
tree /tmp/kafka-logs/
/tmp/kafka-logs/
├── cleaner-offset-checkpoint
├── log-start-offset-checkpoint
├── meta.properties
├── recovery-point-offset-checkpoint
├── replication-offset-checkpoint
├── test01-0
│ ├── 00000000000000000000.index
│ ├── 00000000000000000000.log
│ ├── 00000000000000000000.timeindex
│ ├── leader-epoch-checkpoint
│ └── partition.metadata
├── test02-0
│ ├── 00000000000000000000.index
│ ├── 00000000000000000000.log
│ ├── 00000000000000000000.timeindex
│ ├── leader-epoch-checkpoint
│ └── partition.metadata
├── test02-1
│ ├── 00000000000000000000.index
│ ├── 00000000000000000000.log
│ ├── 00000000000000000000.timeindex
│ ├── leader-epoch-checkpoint
│ └── partition.metadata
└── test02-2
├── 00000000000000000000.index
├── 00000000000000000000.log
├── 00000000000000000000.timeindex
├── leader-epoch-checkpoint
└── partition.metadata

# 查看 test01 P0 日志文件
strings /tmp/kafka-logs/test01-0/00000000000000000000.log
first
second

# 发送 6条消息 => test02
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test02
>message1
>message2
>message3
>message4
>message5
>message6
>^C

# test02 分区P0(1,4,6) P1(2,5) P2(3)
strings /tmp/kafka-logs/test02-0/00000000000000000000.log
message1
message4
message6
strings /tmp/kafka-logs/test02-1/00000000000000000000.log
message2
message5
strings /tmp/kafka-logs/test02-2/00000000000000000000.log
message3

# 启动一个 consumer 去消费 test02 ===> 多分区无序(但是同一个分区是有序的,1一定在4前面,4一定在6前面)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test02 --from-beginning
message1 => 分区P0
message4
message6
message2 => 分区P1
message5
message3 => 分区P2
^CProcessed a total of 6 messages

# 查看分区偏移量
cat /tmp/kafka-logs/replication-offset-checkpoint
0
4 => 共4个分区
test01 0 2 => 主题 分区号 偏移量
test02 0 3
test02 1 2
test02 2 1

cat /tmp/kafka-logs/recovery-point-offset-checkpoint
0
4
test01 0 0
test02 0 0
test02 1 0
test02 2 0

分布式模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# 再启动 3台 kafka
1 server.properties broker.id=0 #listeners=PLAINTEXT://:9092 log.dirs=/tmp/kafka-logs
2 server1.properties broker.id=1 listeners=PLAINTEXT://:9093 log.dirs=/tmp/kafka-logs1
3 server2.properties broker.id=2 listeners=PLAINTEXT://:9094 log.dirs=/tmp/kafka-logs2
4 server3.properties broker.id=3 listeners=PLAINTEXT://:9095 log.dirs=/tmp/kafka-logs3

cp config/server.properties config/server1.properties
cp config/server.properties config/server2.properties
cp config/server.properties config/server3.properties

server.properties
broker.id=0
#listeners=PLAINTEXT://:9092
log.dirs=/tmp/kafka-logs
server1.properties
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs1
server2.properties
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs2
server3.properties
broker.id=3
listeners=PLAINTEXT://:9095
log.dirs=/tmp/kafka-logs3

bin/kafka-server-start.sh config/server1.properties
bin/kafka-server-start.sh config/server2.properties
bin/kafka-server-start.sh config/server3.properties

netstat -atlunp | grep 909* | grep LISTEN
tcp6 0 0 :::9092 :::* LISTEN 15951/java
tcp6 0 0 :::9093 :::* LISTEN 1929/java
tcp6 0 0 :::9094 :::* LISTEN 10163/java
tcp6 0 0 :::9095 :::* LISTEN 17230/java

# 创建 topic 3副本,分别1分区,3分区,5分区
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic replicated-p1
Created topic replicated-p1.
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic replicated-p3
Created topic replicated-p3.
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 5 --topic replicated-p5
Created topic replicated-p5.


# 查看详情
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic replicated-p1
Topic: replicated-p1 TopicId: k_jFlDVpR9ektxaEgSHHvw PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: replicated-p1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic replicated-p3
Topic: replicated-p3 TopicId: jKy4NOCDQheTMYLy48dKqA PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: replicated-p3 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: replicated-p3 Partition: 1 Leader: 3 Replicas: 3,1,0 Isr: 3,1,0
Topic: replicated-p3 Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic replicated-p5
Topic: replicated-p5 TopicId: UzKuWqTvT0uNs2fZpqlfwg PartitionCount: 5 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: replicated-p5 Partition: 0 Leader: 0 Replicas: 0,3,1 Isr: 0,3,1
Topic: replicated-p5 Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: replicated-p5 Partition: 2 Leader: 3 Replicas: 3,0,2 Isr: 3,0,2
Topic: replicated-p5 Partition: 3 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: replicated-p5 Partition: 4 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2

# 杀掉节点3,再看详情
jps -lm
15951 kafka.Kafka config/server.properties
1929 kafka.Kafka config/server1.properties
10163 kafka.Kafka config/server2.properties
17230 kafka.Kafka config/server3.properties
14474 org.apache.zookeeper.server.quorum.QuorumPeerMain config/zookeeper.properties

kill -9 17230

bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic replicated-p1
Topic: replicated-p1 TopicId: k_jFlDVpR9ektxaEgSHHvw PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: replicated-p1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic replicated-p3
Topic: replicated-p3 TopicId: jKy4NOCDQheTMYLy48dKqA PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: replicated-p3 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,1
Topic: replicated-p3 Partition: 1 Leader: 1 Replicas: 3,1,0 Isr: 1,0
Topic: replicated-p3 Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic replicated-p5
Topic: replicated-p5 TopicId: UzKuWqTvT0uNs2fZpqlfwg PartitionCount: 5 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: replicated-p5 Partition: 0 Leader: 0 Replicas: 0,3,1 Isr: 0,1
Topic: replicated-p5 Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: replicated-p5 Partition: 2 Leader: 0 Replicas: 3,0,2 Isr: 0,2
Topic: replicated-p5 Partition: 3 Leader: 1 Replicas: 1,2,3 Isr: 1,2
Topic: replicated-p5 Partition: 4 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2

# 再启动节点3,再看详情
bin/kafka-server-start.sh config/server3.properties

bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic replicated-p1
Topic: replicated-p1 TopicId: k_jFlDVpR9ektxaEgSHHvw PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: replicated-p1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic replicated-p3
Topic: replicated-p3 TopicId: jKy4NOCDQheTMYLy48dKqA PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: replicated-p3 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,1,3
Topic: replicated-p3 Partition: 1 Leader: 1 Replicas: 3,1,0 Isr: 1,0,3
Topic: replicated-p3 Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic replicated-p5
Topic: replicated-p5 TopicId: UzKuWqTvT0uNs2fZpqlfwg PartitionCount: 5 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: replicated-p5 Partition: 0 Leader: 0 Replicas: 0,3,1 Isr: 0,1,3
Topic: replicated-p5 Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: replicated-p5 Partition: 2 Leader: 3 Replicas: 3,0,2 Isr: 0,2,3
Topic: replicated-p5 Partition: 3 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: replicated-p5 Partition: 4 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2

# 所有 topic leader 重分配所有节点,再查看详情
bin/kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type=PREFERRED --all-topic-partitions

bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic replicated-p1
Topic: replicated-p1 TopicId: k_jFlDVpR9ektxaEgSHHvw PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: replicated-p1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic replicated-p3
Topic: replicated-p3 TopicId: jKy4NOCDQheTMYLy48dKqA PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: replicated-p3 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,1,3
Topic: replicated-p3 Partition: 1 Leader: 3 Replicas: 3,1,0 Isr: 1,0,3 # leader 1->3
Topic: replicated-p3 Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic replicated-p5
Topic: replicated-p5 TopicId: UzKuWqTvT0uNs2fZpqlfwg PartitionCount: 5 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: replicated-p5 Partition: 0 Leader: 0 Replicas: 0,3,1 Isr: 0,1,3
Topic: replicated-p5 Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: replicated-p5 Partition: 2 Leader: 3 Replicas: 3,0,2 Isr: 0,2,3
Topic: replicated-p5 Partition: 3 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: replicated-p5 Partition: 4 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2

消费组示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-consumer
Created topic test-consumer.

# 生产者:发送前,先启动下面消费者1、消费者2
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-consumer
>message1
>message2
>message3
>message4
>message5
>message6
>message7
>^C

# 消费者1 组test-consumer-group
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-consumer --group=test-consumer-group --from-beginning
message2
message4
message6
^CProcessed a total of 3 messages

# 消费者2 组test-consumer-group
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-consumer --group=test-consumer-group --from-beginning
message1
message3
message5
message7
^CProcessed a total of 4 messages

# 消费者3 随机组
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-consumer --from-beginning
message1
message2
message3
message4
message5
message6
message7
^CProcessed a total of 7 messages

# 查看消费者组
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-consumer-group
Consumer group 'test-consumer-group' has no active members.

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-consumer-group test-consumer 0 3 3 0 - - -
test-consumer-group test-consumer 1 4 4 0 - - -

天博

在这里插入图片描述