Kafka总结(二)常见组件下 Topic、Partition等介绍


1. CAP理论

分布式系统中,一致性,可用性,分区容忍性大多数情况下只能同时满足两个,一般分区容忍性都要有保障,因此很多时候是在可用性可一致性之间做权衡。

  1. 一致性(Consistency)

    • 通过某个节点的写操作结果对后面通过其他节点的读操作可见
    • 如果更新数据后,并发访问的情况下可立即感知更新,成为强一致性
    • 如果允许之后部分或者全部感知不到该更新,成为弱一致性
    • 弱在之后的一段时间(通常该事件不固定)后,一定可以感知该更新,称为最终一致性
  2. 可用性(Availability)

    • 任何一个没有发生故障的节点必须在有限的时间内返回合理的结果,响应时间尽可能短。
  3. 分区容忍性(Partition tolerance)

    • 部分节点宕机或者无法与其他节点通信时,各分区还可以保持分布式系统的功能

1.2 分布式系统中的选择:

C-P (一致性-分区容忍性):保证数据的强一致性,如果不相同数据不返回
HBase、Redis、MongoDB

A-P(可用性-分区容忍性):保证在有效时间内数据返回, 数据可能会不一致
Dynamo、Cassandra、Kafka

C-A (一致性-可用性):保证在有效时间内返回最新数据,如果节点宕机则无法使用
RDBMS,因为像MySQL、Oracle 等关系型数据库一般不做分布式,所以不需要满足分区容忍性

1.3 一致性方案

  • Master-slave
    RDBMS 的读写分离即为典型的 master-slave 方案
    同步复制可保证强一致性,但可能会影响性能,需要等待所有节点写入成功才会返回写入成功。
    异步复制可提供高可用性,但会减低一致性,数据写进master立即返回写成功

  • WNR
    主要用于去中心化(P2P)的分布式系统中,DynamoDB 与 Cassandra 即采用此方案
    N代表副本数,W 代表写操作要保证的最少写成功副本数,R 代表每次读至少读取的副本数
    当W+R>N时,可保证每次读取的数据至少有一个副本具有最新的更新
    多个写操作时的顺序难以保证,可能导致多个副本间的写操作顺序不一致,Dynamo通过向量时钟保证最终一致性

  • Paxos 及其变重
    Google 的 chubby,Zookeeper 的 zab,RAFT 等

2. Topic

  • Topic(主题)是一个逻辑概念,同一个 Topic 的消息可分布在一个或多个节点(broker)上
  • Topic 相当于传统消息系统 MQ 中的一个队列 queue
  • 一个 Topic 可以包含一个或者多个 Partition
  • 每条消息都属于而且仅仅属于一个 Topic
  • Producer 发布数据时,必须指定将该数据发布到那个 Topic,但是不需要指定发送到 Topic 下的哪个 Partition,因为 kafka 会把收到的消息进行load balance,均匀的分布在这个Topic下的不同Partition上(默认hash Partitioner)。
  • consumer订阅消息时,必须指定定订阅哪个Topic的消息

2.1 Topic相关操作

#创建topic
[root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --create --topic test1 --partitions 3 --replication-factor 1
Created topic "test1".
[root@node2 kafka_2.12-1.0.2]#

#查看所有topic
[root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181  --list
test1

# 查看topic 状态
[root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --describe --topic test1
Topic:test1    PartitionCount:3    ReplicationFactor:1    Configs:
    Topic: test1    Partition: 0    Leader: 2    Replicas: 2    Isr: 2
    Topic: test1    Partition: 1    Leader: 0    Replicas: 0    Isr: 0
    Topic: test1    Partition: 2    Leader: 1    Replicas: 1    Isr: 1
#删除topic
[root@node1 bin]# ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --delete --topic test1
Topic test1 is marked for deletion.

2.2 创建和删除Topic的流程

  1. Controller 在 Zookeeper 的 /brokers/topics 节点上注册Watch,一旦某个 Topic 被创建或删除,则该 Controller 会通过 Watch 得到新 创建/删除 的 Topic 的 Partition/replica 分配情况

  2. 对于删除操作:Topic 工具会将该 Topic 名字保存于 /admin/delete_topics

    • 若delete.topic.enable为true,则Controller注册在/admin/delete_topics上的 Watch 被触发,Controller通过调用对应的Broker发送StopReplicaRequest
    • 若delete.topic.enable为false,则不会做任何操作。
  3. 对于创建操作:Controller 从 /brokers/ids 读取所有可用的 Broker 列表,对于 Set_p 中每个Partition:

    • 从分配给该 Partition 中所有的 Replica(All Replica, AR)中选择任意一个可用的Broker 作为Leader,并将AR设置为新的ISR,因为Topic是新创建的所以AR中所有的Replica都没有数据,也都在ISR中,都可以作为Leader。
    • 将新的Leader和ISR写入/brokers/topics/topic-name/partitions/partition-id
  4. 直接通过RPC向相关的Broker发送 LeaderAndISRRequest。

3. Partition

  • 在物理存储上,一个Topic会分成一个或多个Partition,每个Partition相当于是一个子queue。
  • 在物理结构上,一个Partition对应一个物理的目录(文件夹),文件夹的名称是topicname_partition_序号。
  • 一个Topic可以有一个或多个Partition,这是视业务需求来设置。

Topic的Partitions图示

3.1 Partition 特点

  1. Partition 是一个物理概念,一个Partition只分布于一个broker上(不考虑备份(replica))
  2. 一个 Partition 物理磁盘上对应一个文件夹
  3. 一个 Partition 包含多个 Segment,一个 Segment 对应一个文件(Segment对用户透明,用户并不需要知道Segment的存在)
  4. Segment 由一个个不可变的记录组成,记录只会append到Segment中,不会被单独删除或者修改
  5. 清理过期日志时直接删除Segment,Partition保存数据大小,后台进程周期性检查(日志默认保留168小时)

Partition 数据写入

3.2 Partition 数量

Topic 的默认 Partition 数量在server.properties 配置文件中num.partitions参数来设置,也可以在创建Topic时指定Partition数量,如下命令,创建一个名为test2的Topic 这个Topic有2个分区。

#创建topic
[root@node2 kafka_2.12-1.0.2]# bin/kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181 --replication-factor 1 --partitions 2 --topic test2
Created topic "test2".

#查看分区情况
[root@node2 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --describe --topic test2
Topic:test2    PartitionCount:2    ReplicationFactor:1    Configs:
    Topic: test2    Partition: 0    Leader: 2    Replicas: 2    Isr: 2
    Topic: test2    Partition: 1    Leader: 0    Replicas: 0    Isr: 0

#topic test2 共有两个分区分别在node1 和node3上面

#还可以在创建好的topic之上增加分区,如下命令,我们将上面创建的2个分区增加到3个分区。
[root@node3 kafka_2.12-1.0.2]# bin/kafka-topics.sh --alter --topic test2 --zookeeper node1:2181,node2:2181,node3:2181 --partitions 3
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

#查看分区情况
[root@node3 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --describe --topic test2
Topic:test2    PartitionCount:3    ReplicationFactor:1    Configs:
    Topic: test2    Partition: 0    Leader: 2    Replicas: 2    Isr: 2
    Topic: test2    Partition: 1    Leader: 0    Replicas: 0    Isr: 0
    Topic: test2    Partition: 2    Leader: 1    Replicas: 1    Isr: 1

# 可以看到新增的分区被分配到了原来没有分区的node2上面
  • 一般来说,一个Topic的Partition数量大于等于broker的数量可以提高吞吐量,同一个Partition的replica(副本)尽量分散到不同的机器,以实现高可用。
  • 当增加一个新的Partition的时候,Partition里面的数据不会重新分配,原来的Partition里面的数据不会变化,新增加的这个Partition刚开始是空的,随后进入到这个Topic的数据就会重新进行load balance 将数据均匀分配到所有Partition。

需要注意的是kafka的分区只支持增加,不建议减少也不支持减少。当减少分区时需要设计到减去的这个分区的数据需要分配到其他分区,其中还包含该分区副本的分配问题,以及该分区的offset等一系列问题。所以不建议减少分区。

3.3 Partition 副本

有时为了保证高可用,会为分区会有多个相同的数据,这些冗余的数据就称为该Partition分区的副本(replica),这样一来如果有部分服务器不可用,副本所在的服务器就会接替上来,保证应用的持续性。

3.3.1 Partition 副本存储介绍

 # 查看topic 状态
 [root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --describe --topic test1
 Topic:test1    PartitionCount:3    ReplicationFactor:1    Configs:
     Topic: test1    Partition: 0    Leader: 2    Replicas: 2    Isr: 2
     Topic: test1    Partition: 1    Leader: 0    Replicas: 0    Isr: 0
     Topic: test1    Partition: 2    Leader: 1    Replicas: 1    Isr: 1

 #node1
 [root@node2 ~]# cd /opt/kafka_2.12-1.0.2/kafka-logs/
 [root@node2 kafka-logs]# ll
 drwxr-xr-x. 2 root root 4096 2月  10 16:51 test1-1
 # test1-1:test1 为topic 名称
 # test1-1:1 为partition编号

 #node2
 [root@node2 ~]# cd /opt/kafka_2.12-1.0.2/kafka-logs/
 [root@node2 kafka-logs]# ll
 drwxr-xr-x. 2 root root 4096 2月  10 16:51 test1-2

 # node3
 [root@node3 ~]# cd /opt/kafka_2.12-1.0.2/kafka-logs/
 [root@node3 kafka-logs]# ll
 drwxr-xr-x. 2 root root 4096 2月  10 16:51 test1-0

从上面的代码中可以看出Topic test1数据保存在/opt/kafka_2.12-1.0.2/kafka-logs/ 路径下,共有三个Partition均匀分布在node1、node2、node3上面,Partition只有一个副本。

3.3.2 Partition 副本默认分布

Kafka集群 Partition replication 默认自动分配分析

下面以一个Kafka集群中3个broker举例,创建一个Topic包含3个Partition并且每个Partition有2个replication 。数据producer流动如下图所示:

Partition 与副本分布图示

副本分配逻辑规则如下:

  • 在Kafka集群中,每个broker都有均等分配Partition的leader的机会。

  • 上述broker Partition中,箭头指向副本,以Partition-0为例,broker1中partition-0为leader,broker2中partition-0为副本。

  • 上述图中每个broker(按照brokerID有序)依次分配住Partition,下一个broker为副本,如此循环迭代分配,多副本都遵循此规则。

副本分配算法如下:

  • 将所有n个broker和待分配的i个Partition排序。

  • 将第 i 个Partition分配到第(i mod n)个broker上。

  • 将第 i 个Partition的第j个副本分配到((i + j) mod n)个broker上。

3.3.3 Partition 副本同步

Kafka作为分布式系统,为了实现HA,采用多副本机制,并确保Leader Crash时,Follower能接管服务,这就要求备Follower和Leader一直保持同步,避免Leader异常是Follower数据丢失。

先看一下相关概念:

概念 说明
ISR ISR(in-sync replica) 为某个分区维护的一组同步集合,即每个分区都有自己的一个 ISR 集合,处于 ISR 集合中的副本,意味着 follower 副本与 leader 副本保持同步状态,只有处于 ISR 集合中的副本才有资格被选举为 leader。一条 Kafka 消息,只有被 ISR 中的副本都接收到,才被视为“已同步”状态。这跟 zk 的同步机制不一样,zk 只需要超过半数节点写入,就可被视为已写入成功。
Hight Watermark 副本水位值,表示分区中最新一条已提交(Committed)的消息的Offset。
LEO Log End Offset,Leader 中最新消息的Offset。
Committed Message 已提交消息,已经被所有 ISR 同步的消息。
Lagging Message 没有达到所有ISR同步的消息。

概念说明图示

每个Partition有一个预写式日志(write-ahead log)文件,每个 Partition 都由一系列有序的、不可变的消息组成,这些消息被连续的追加到Partition中,Partition中的每个消息都有一个连续的序列号叫做offset, 确定它在分区日志中唯一的位置(上图中的每个数字都是一个offset都会对应Producer发送的一条数据)。

每个Partition都可以有多(N)个副本,N就是Topic的复制因子,在kafka中发生复制时确保Partition的预写日志都有序地写到其他节点上,N个副本中,有一个副本为Leader,其他副本都为Follower,Leader处理Partition的所有请求,同时Follower会定期的去Leader上 拉取数据。
如下图有三个broker,Topic1中有三个副本,则复制因此副本个数为3。

Partition 副本同步

Kafka 同步数据流程
从上图可看出,leader 的 remote LEO(Log End Offset) 的值相对于 follower LEO 值,滞后一个 follower RPC 请求,remote LEO 决定 leader HighWaterMark 值的大小。

同时,Leader 副本永远领先 Follower 副本 一个RPC请求,且各个 follower 副本之间的消息最新位移也不尽相同,Kafka 必须要定义一个落后 Leader 副本位移的范围,使得处于这个范围之内的 Follower 副本被认为与 Leader 副本是处于同步状态的,即处于 ISR 集合中。

介绍一下 LEO(Log End Offset) 和 HighWaterMark 值的更新机制:

  1. LEO 更新:

    • Leader 副本自身的 LEO 值更新:在 Producer 消息发送过来时,即 leader 副本当前最新存储的消息位移位置 +1;
    • Follower 副本自身的 LEO 值更新:从 leader 副本中 fetch 到消息并写到本地日志文件时,即 follower 副本当前同步 leader 副本最新的消息位移位置 +1;
    • Leader 副本中的 remote LEO 值更新:每次 follower 副本发送 fetch 请求都会包含 follower 当前 LEO 值,leader 拿到该值就会尝试更新 remote LEO 值。
  2. Leader HighWaterMark 更新:

    • 正常时更新:

      • Producer 向 Leader 副本写入消息时:在消息写入时会更新 Leader LEO 值,因此需要再检查是否需要更新 HW 值;
      • Leader 处理 Follower FETCH 请求时:Follower 的 fetch 请求会携带 LEO 值,leader 会根据这个值更新对应的 remote LEO 值,同时也需要检查是否需要更新 HW 值。
    • 故障时更新:

      • 副本被选为 Leader 副本时:当某个 Follower 副本被选为分区的 leader 副本时,kafka 就会尝试更新 HW 值;
      • 副本被踢出 ISR 时:如果某个副本追不上 Leader 副本进度,或者所在 broker 崩溃了,导致被踢出 ISR,Leader 也会检查 HW 值是否需要更新,毕竟 HW 值更新只跟处于 ISR 的副本 LEO 有关系。
  1. Follower HW 更新:
    • Follower 更新 HW 发生在其更新 LEO 之后,每次 Follower Fetch 响应体都会包含 Leader 的 HW 值,然后比较当前 LEO 值,取最小的作为新的 HW 值。

Leader与Follower消息同步异常一般有以下三种情况:

  1. 慢副本:在一定周期内Follower同步的数据无法赶上Leader,比如在某段时间内由于Follower磁盘IO或者网络出现异常,导致数据写入速度变慢。
  2. 卡住副本:Follower在某段时间内没有向Leader发送拉取同步数据(Fetch)的请求,比如由于GC导致进程卡住。
  3. 新启动副本:由于副本因子(offsets.topic.replication.factor)的变化导致新增副本,此副本不会在ISR列表中,知道同步的消息追赶上Leader。

异常解决:
replica.lag.time.max.ms 参数主要作用是检测上面1、2 两种情况,

  • 如果Follower 拉取数据的时间超过该值,则认为节点卡住,会将该节点(副本)剔除ISR
  • 如果Follower 连续出现多次出现同步延迟超过此阈值时,就会被认为是慢副本,被踢出ISR。

3.3.4 Partition 副本重新分配

先了解几个概念

  • AR:分配给该 Partition 的所有副本(Current Assigned Replicas)
  • OAR:分区副本的原始列表(Original list of replicas for Partition)
  • RAR:重新分配的副本(Reassigned replicas)

管理工具发出重新分配 Partition副本请求之后,会将相应信息写到 /admin/reassign_partitions上,而且该操作会触发 ReassignedPartitionsIsrChangeListener ,从而通过执行回调函数 KafkaController.onPartitionReassignment 来完成以下操作:

  • Partition 重新分配的过程分析
    某个分区有三个副本分别在broker1,2,3上面,要将这三个副本移动到 broker4,5,6上

OAR = {1,2,3},RAR = {4,5,6},Partition 重新分配过程中 ZooKeeper 中的 AR 和 Leader/ISR 路径如下

AR leader/isr Sttep 说明
{1,2,3} 1/{1,2,3} (init)
{1,2,3,4,5,6} 1/{1,2,3} (step 2) AR=OAR + RAR,加入之后会同步数据
{1,2,3,4,5,6} 1/{1,2,3,4,5,6} (step 4) 当副本数据同步完成之后会加入到ISR中,这时ISR= AR= OAR+ RAR
{1,2,3,4,5,6} 4/{1,2,3,4,5,6} (step 7) 在RAR中选择一个新的Leader
{1,2,3,4,5,6} 4/{4,5,6} (step 8) 从 ISR 中减去 OAR
{4,5,6} 4/{4,5,6} (step 10) 从AR中减去OAR
  1. 将 Zookeeper 中的 AR 更新为 OAR + RAR(当前的Partition副本分布列表更新为旧的副本分布列表和要重新分配的副本列表)
  2. 强制更新 Zookeeper 中的Leader epoch,向AR中的每个副本发送 LeaderAndIsrRequest。
  3. 将 RAR 减去 OAR (重新分配的副本但是不在旧的副本列表中的副本)设置为NewReplica 状态。
  4. 等待直到 RAR 中所有的副本都与其 Leader 同步。
  5. 若 RAR 中有的副本都设置为 OnlineReplica 状态
  6. 将 Cache 中的AR设置为 RAR。
  7. 若 Leader 不在RAR中,则从 RAR 中重新选择出一个新的 Leader 并发送LeaderAndIsrRequest。若新的Leader 不是从 RAR 中选举而出,则还要增加 Zookeeper 中的Leader epoch。
  8. 将 OAR 减去 RAR 中所有的Replica 设置为 OfflineReplica 状态,该过程包含两部分。
    • 1.将Zookeeper上的OAR - RAR 移除并向Leader发送LeaderAndIsrRequest,从而通知这些 Replica 已经从 ISR 中移除。
    • 2.向 OAR - RAR 中的 Replica 发送StopReplicaRequest 从而停止不在给该 Partition 的Replica。
  9. 将 OAR 减去 RAR 中的所有 Replica 设置为 NonExistentReplica 状态,从而将其从磁盘上删除。
  10. 将 Zookeeper 中的 AR 设置为 RAR。
  11. 删除 /admin/reassign_partition节点

注意:最后一步才将 Zookeeper 中的AR更新,因为这是唯一一个持久存储 AR 的地方,如果 COntroller 在这一步之前宕机,新的Controller 仍然能够继续完成该过程。

3.4 Partition Leader 与 Follower

  • 为了保证较高的处理效率,消息的读写都是在固定的一个副本上完成。负责读写这个副本就是所谓的Partition Leader,而其他副本则是Partition Follower。
  • Producer将数据发送到kafka的时候先将数据写入到Partition Leader,再由Partition Leader push给Partition Follower。
  • Partition Leader与Follower的信息受zookeeper管理,一旦Partition Leader所在的broker节点宕机。zookeeper会在其它的broker 的Partition Follower中选择一个Follower变为Partition Leader。
# 创建一个topic 该topic有三个partition 和三个副本(replica)
[root@node3 kafka_2.12-1.0.2]# bin/kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181 --replication-factor 3 --partitions 3 --topic test3
Created topic "test3".
[root@node3 kafka_2.12-1.0.2]#
[root@node3 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --describe --topic test3
Topic:test3    PartitionCount:3    ReplicationFactor:3    Configs:
    Topic: test3    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: test3    Partition: 1    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: test3    Partition: 2    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2

#Leader 指的是broker id    
#查看topic信息,能够看到 partition 0 的leader 在node2上,partition 1 的leader 在node3上,partition 2 的leader 在node1上     

#node1 上执行
[root@node2 kafka_2.12-1.0.2]# ll kafka-logs/
drwxr-xr-x. 2 root root 4096 2月  12 17:19 test3-0
drwxr-xr-x. 2 root root 4096 2月  12 17:19 test3-1
drwxr-xr-x. 2 root root 4096 2月  12 17:19 test3-2 # partition2 的 leader

#node2 上执行
[root@node2 kafka_2.12-1.0.2]# ll kafka-logs/
drwxr-xr-x. 2 root root 4096 2月  12 17:19 test3-0 # partition0 的 leader
drwxr-xr-x. 2 root root 4096 2月  12 17:19 test3-1
drwxr-xr-x. 2 root root 4096 2月  12 17:19 test3-2 

#node3 上执行
[root@node2 kafka_2.12-1.0.2]# ll kafka-logs/
drwxr-xr-x. 2 root root 4096 2月  12 17:19 test3-0 
drwxr-xr-x. 2 root root 4096 2月  12 17:19 test3-1 # partition1 的 leader
drwxr-xr-x. 2 root root 4096 2月  12 17:19 test3-2 

3.4.1 ISR与副本选择

所有的读写都发生在Leader上,Leader是否均匀分布会直接影响集群的运行效率。副本之后再ISR中才会被选为Leader。

  • kafka内部动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR。
  • 在这个集合的节点都是和leader保持高度一致的,任何一条消息必须被这个集合的每个节点读取并追加到日志了,才会通知外部这个消息已经被提交了。
  • 因此这个集合中的任何一个节都可以点随时被选为leader。ISR在zookeeper中维护。
    如果ISR中有n+1 个节点,那么就可以允许最大n个节点宕机,而不会造成消息丢失且能够提供正常服务。
    ISR的成员是动态的,如果一个节点被淘汰了,当它重新达到“同步中”的状态时,它可以重新加入ISR。这种leader的选择方式是非常快速的,适合kafka的场景

如果所有节点都宕机了怎么办呢?

kafka对于数据不丢失的保证是基于至少一个节点是存活的情况下,一旦所有节点都宕机了,那么就无法保证了。

在实际应用中,当所有的ISR列表中的节点都宕机时必须及时做出反应,可以有以下两种方式:

1.等待ISR中的任何一个节点恢复并担任leader。

2.在所有存活的节点中选择一个节点作为leader(不只是在ISR中)。

这是一个在可用性和连续性之间的权衡,如果等待ISR节点恢复,一旦ISR中的节点无法启动或者数据丢失了,那集群就永远恢复不了了。如果选择ISR之外的节点恢复,那么这个节点的数据会被作为线上数据,有可能和真实数据有所出入,因为有些数据它可能还没同步到。这里涉及到了CAP理论中系统的可用性与一致性的问题,我们可以根据系统需求来在两种方式中做出选择。kafka目前默认选择了第二种策略。这种困境不只kafka会遇到,几乎所有的分布式系统都会遇到这种问题。

ISR相关配置:

  • server配置

replica.lag.time.max.ms = 10000 落后的时间是10s 超过十秒没发送ack请求则从isr中移除

replica.lag.max.message = 4000 数据相差4000条

  • Topic配置

min.insync.replicas = 1 保证最少有多少副本(replica)

3.4.2 首副本选举过程

  1. 脚本启动时会在 Zookeeper 上创建/admin/preferred_replica_election,并存入需要调整 Preferred Replica 的 Partition信息。
  2. Controller 一直Watch该节点,一旦该节点被创建,Controller会收到通知,并获取该节点内容。
  3. Controller 读取 Preferred Replica
    • 如果发现该Replica并非是所在Partition的 leader 并且它在该 Partition 的 ISR 中, Controller 向该Replica 发送LeaderAndIsrRequest,让该副本成为Leader。
    • 如果该副本当前并非是Leader,且不再ISR中,Controller为了保证不让数据丢失,并不会将其设置为Leader

3.4.3 Leader负载均衡的脚本与配置

  • 手动Leader负载均衡脚本

kafka-preferred-replica-election.sh

  • 可通过如下配置来启用和控制自动Leadership平衡
参数 描述
leader.imbalance.check.interval.seconds=300 每个300秒检查leader的负载均衡情况
leader.imbalance.per.broker.percentage=10 不平衡性超过阈值就自动触发负载均衡
auto.leader.rebalance.enable=true 默认是开启的

4 Segment

4.1 Segment 介绍

  • Partition 是一个大文件夹,Partition的数据被平均分配到多个大小相等的小文件中,这些小文件就是Segment。
  • Partition 相当于是一个队列,队列中的元素是Segment,消费时先从第0个Segment开始,新来的message保存在队列的末尾,Partition只需要支持顺序读写就行了。
  • Segment也相当于一个队列,队列元素是message,由offset标识用来区分message。从第一个message开始消费,新进来的message存在于Segment的末尾。
  • Kafka broker收到数据会向对应的Partition上的最后一个Segment上添加该消息。
  • Segment上的消息条数达到配置值或消息发布时间超过阈值时,Segment上的消息会被flush到磁盘,只有flush到磁盘上的消息consumer才能消费。
  • Segment达到一定的大小后将不会再向该Segment中写数据了,这时broker会创建新的Segment。
  • Segment files 消息数据量不一定相等,这种特性方便old Segment file被快速删除。
  • Segment文件生命周期由服务端配置参数决定。

Segment 相关配置:

参数 说明
log.segment.bytes 控制日志Segment文件的大小,超出该大小则追加到一个新的日志Segment文件中(-1表示没有限制),默认10G
log.roll.hours 当达到下面时间,会强制新建一个Segment
segment.index.bytes 对于Segment索引文件的大小限制,默认为10M

Segment示意图

Segment 在 Partition 中的结构大约如上图,这样做的好处是是在删除数据时只需删除指定的Segment文件即可,无需对数据进行删除,能够极大的提高磁盘利用率。

4.2 Segment 文件结构

Segment File由三个部分组成:

  • index file:Segment索引文件,前缀为上一个Segment的最后一个消息的偏移,后缀为 *.index
  • data file:Segment数据文件,前缀为上一个Segment的最后一个消息的偏移,后缀为 *.log
  • timeindex file:Segment时间索引文件,前缀为上一个Segment的最后一个消息的偏移,后缀为 *.timeindex(自0.10.0.1开始,Kafka为每个Topic分区增加了基于时间的索引文件)

Partition文件夹中还会有一个leader-epoch-checkpoint 文件,这个文件中保存了每一任Leader开始写入消息是的offset,会定时更新,Follower被选举为Leader时会根据这个确定哪些消息是可用的。

Segment File 文件命名规则:
Partition全局的第一个Segment从0 开始,后续每个Segment文件名为上一个Segment文件最后一条消息的Offset值。数值最大为64位Long大小,19位数字字符长度,没有数字用0填充。

[root@node1 ~]# cd /opt/kafka_2.12-1.0.2/kafka-logs/
[root@node1 test1-1]# ll -rth
总用量 21M
-rw-r--r--. 1 root root 10M 2月  10 16:46 00000000000000000000.index
# index 文件,索引文件
-rw-r--r--. 1 root root 10M 2月  10 16:46 00000000000000000000.timeindex
# time index 文件,时间索引文件
-rw-r--r--. 1 root root 216 2月  10 20:19 00000000000000000000.log
# 数据文件
-rw-r--r--. 1 root root   8 2月  10 16:51 leader-epoch-checkpoint
#文件中保存了每一任Leader开始写入消息是的offset,会定时更新,Follower被选举为Leader时会根据这个确定哪些消息是可用的。

Segment的名字是以Segment存储最小的offset的值命名的因为刚刚创建还没数据所所以最小的offset的名字是000000… 所以Segment的名字是000000…..

parition内的每条消息都有一个有序的id号,这个id号被称为偏移量(offset),它可以唯一确定每条消息在parition内的位置。即offset表示partiion的第多少message

4.3 通过 offset 进行数据定位

Segment 文件列表

例如以上图为例:读取offset=368776的message,需要通过下面2个步骤查找。

第一步查找Segment file:

  • 第一个文件 00000000000000000000.index的消息起始偏移量(offset)为0。(最开始的文件)
  • 第二个文件 00000000000000368769.index的消息起始偏移量为368770 = 368769 + 1
  • 第三个文件 00000000000000737337.index的起始偏移量为737338=737337 + 1

其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。 当offset=368776时定位到00000000000000368769.index|log

第二步通过Segment file查找message:

  • 通过第一步定位到Segment file,当offset=368776时,
  • 依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,
  • 然后再通过00000000000000368769.log顺序查找直到offset=368776为止。

Segment中index<—->data file对应关系物理结构

上述图中索引文件存储大量元数据,数据文件存储大量消息数据,索引文件中元数据指向对应数据文件中message的物理偏移地址。 其中以索引文件中元数据3,497为例,依次在数据文件中表示第3个message(在全局partiton表示第368772个message)、以及该消息的物理偏移地址为497。

4.4 timeindex

在消息中增加了一个时间戳字段和时间戳类型。目前支持的时间戳类型有两种:

  • CreateTime :表示producer创建这条消息的时间
  • LogAppendTime :表示broker接收到这条消息的时间(严格来说,是leader broker将这条消息写入到log的时间)
参数 说明
log.message.timestamp.type 时间戳类型
index.interval.bytes 索引项间隔由

4.4.1 如何使用时间戳?

Kafka broker config提供了一个参数:log.message.timestamp.type来统一指定集群中的所有Topic使用哪种时间戳类型。用户也可以为单个Topic设置不同的时间戳类型,具体做法是创建Topic时覆盖掉全局配置:

bin/kafka-topics.sh 
--zookeeper localhost:2181 
--create 
--topic test 
--partitions 1 
--replication-factor 1 
--config message.timestamp.type=LogAppendTime

另外, producer在创建ProducerRecord时可以指定时间戳:

record = new ProducerRecord<String, String>("my-topic", null, System.currentTimeMillis(), "key", "value");

4.4.2 基于时间戳的功能

1 根据时间戳来定位消息:之前的索引文件是根据offset信息的,从逻辑语义上并不方便使用,引入了时间戳之后,Kafka支持根据时间戳来查找定位消息
2 基于时间戳的日志切分策略
3 基于时间戳的日志清除策略

4.4.3 基于时间戳的消息定位

具体的格式是时间戳+位移
时间戳记录的是该日志段当前记录的最大时间戳
位移信息记录的是插入新的索引项时的消息位移信息
该索引文件中的每一行元组(时间戳T,位移offset)表示:该日志段中比T晚的所有消息的位移都比offset大。
由于创建了额外的索引文件,所需的操作系统文件句柄平均要增加1/3(原来需要2个文件,现在需要3个),所以有可能需要调整文件句柄的参数。

4.4 kafka 数据的构成

构成 长度 说明
offset 8 byte 消息偏移量
message size 4 byte message长度
CRC32 4 byte 用crc32校验message
magic 1 byte 本次发布Kafka服务程序协议版本号
attributes 1 byte 独立版本、或标识压缩类型、或编码类型。
key length 4 byte key的长度,当key为-1时,K byte key字段不填
key K byte key value(可选)
payload value bytes 消息数据

5. 数据压缩

在某些情况下,整个应用的瓶颈不在于CPU或者磁盘,而是受网络带宽的影响。当然你可以选择在业务代码中对每一条消息做压缩处理,之后再发送到kafka中,之后业务消费端再进行解压处理,这种方式对应消息的压缩效率是非常低。而真正有效的压缩是对一批消息进行压缩而不是单独的为每条消息进行压缩。

Kafka本身可以支持几种类型的压缩,比如gzip和snappy,更高的版本还支持lz4。默认是none,即不采用任何压缩。开启压缩的方式是在客户端调用的时候设置producer的参数。与压缩有关的参数有:

名称 默认值 在哪使用 说明
compression.type none new producer configs(kafka-client) 生产者生成的所有数据的压缩类型。 默认值为无(即不压缩)。 有效值为none,gzip或snappy。 压缩涉及全部批次的数据,因此,批处理的效率也会影响压缩率(批处理越多意味着压缩效果越好)。
compression.codec none kafka-scala-client 此参数允许您为此生产者生成的所有数据指定压缩编解码器。 有效值为“ none”,“ gzip”和“ snappy”。
compressed.topics null kafka-scala-client 此参数允许您设置是否应为特定主题打开压缩。 如果压缩编解码器不是NoCompressionCodec,则仅对指定主题启用压缩(如果有)。 如果压缩主题列表为空,则为所有主题启用指定的压缩编解码器。 如果压缩编解码器为NoCompressionCodec,则对所有主题禁用压缩

上面表格中提及了kafka-client与kafka-scala-client,这两者之间有什么区别呢?

  • kafka-client:kafka的一个分支,其全部使用java语言来开发kafka的客户端。
  • kafka-scala-client:scala语言开发的客户端。

两者之间采用的参数会有所不同,注意区分。

下面演示两个demo来便于区分两者之间的用法。

  1. kafka-client的使用demo
Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("bootstrap.servers", brokerList);
        properties.put("compression.type", "gzip");

        Producer<String,byte[]> producer = new KafkaProducer<String,byte[]>(properties);

        ProducerRecord<String,byte[]> producerRecord = new ProducerRecord<String,byte[]>(topic, "messages".getBytes());
        Future<RecordMetadata> future =  producer.send(producerRecord, new Callback() 
        {
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                System.out.println(metadata.offset());
            }
        });
  1. kafka-scala-client的使用demo

Properties props = new Properties();
props.put("serializer.class", "com.kafka.compression.kafka.MessageEncoder");
props.put("metadata.broker.list", brokerList);
props.put("producer.type", "async");
props.put("compression.codec", "gzip");

kafka.javaapi.producer.Producer<Integer, byte[]> producer = new kafka.javaapi.producer.Producer<Integer,
        byte[]>(new kafka.producer.ProducerConfig(props));
producer.send(new KeyedMessage<Integer, byte[]>(topic, "messages".getBytes()));

6. zookeeper

kafka 在zookeeper中的元数据结构

参考文档:

https://www.infoq.cn/article/kafka-analysis-part-3


文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Kafka总结(三)配置参数说明 Kafka总结(三)配置参数说明
1. Broker 相关配置 Name Description zookeeper.connect zookeeper集群的地址,逗号分割 advertised.host.name 已弃用:仅在未设
2020-02-12
下一篇 
Kafka总结(二)常见组件上 Broker 、Producer和 Consumer 介绍 Kafka总结(二)常见组件上 Broker 、Producer和 Consumer 介绍
1. Broker1.1 Broker 介绍 Broker 没有副本机制,一旦 Broker 宕机,该 Broker 的消息都将不可用。 Broker 不会保存 Consumer 消费 topic partition offset 的状态
2020-02-09
  目录