Kafka 副本数量调整

2023/09/26

Kafka 创建时未指定多个副本或者副本数量过少,都可以在后期手动添加,另外如果副本过多也可以减少,当前调整基于 Kafka 的版本是 2.5.1,但是估计 2.1 ~ 2.5 应该都是兼容的。

下面先来操作一下 Topic 副本减少的过程,首先查看 Kafka Topic 的详情:

# 新版本的 Kafka 建议使用 --bootstrap-server 不建议再使用 --zookeeper
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test-topic

比如输出如下:

Topic: test-topic     PartitionCount: 4       ReplicationFactor: 2    Configs: cleanup.policy=delete
        Topic: test-topic     Partition: 0    Leader: 1002    Replicas: 1002,1004     Isr: 1002,1004
        Topic: test-topic     Partition: 1    Leader: 1001    Replicas: 1001,1002     Isr: 1001,1002
        Topic: test-topic     Partition: 2    Leader: 1003    Replicas: 1003,1001     Isr: 1001,1003
        Topic: test-topic     Partition: 3    Leader: 1004    Replicas: 1004,1003     Isr: 1003,1004

然后我们看每个 partition 其实是有两个副本的,我们接下来准备缩减一下,那么我们尽量只留 Leader 副本就可以了,干掉 Follower 副本,我们首先编写下面的配置文件:

{"version":1,
  "partitions":[
     {"topic":"test-topic","partition":0,"replicas":[1002]},
     {"topic":"test-topic","partition":1,"replicas":[1001]},
     {"topic":"test-topic","partition":2,"replicas":[1003]},
     {"topic":"test-topic","partition":3,"replicas":[1004]}
]}

然后将上面的 JSON 保存到 reassign.json 文件,这个配置相当于每个分区只保留一个 Leader 副本,然后我们执行命令减少副本:

# 注意这里需要指定 ZooKeeper 地址,2.5 之前的都是这样,最新的好像是支持 --bootstrap-server 参数
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --reassignment-json-file reassign.json --execute

执行完成之后,Kafka 会给我们提示开始执行,然后我们可以用下面的命令查看进度:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --reassignment-json-file reassign.json --verify

等进度完成后,我们再次查看 Topic 的详情:

bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test-topic

这个时候分区就减少了:

Topic: test-topic     PartitionCount: 4       ReplicationFactor: 1    Configs: cleanup.policy=delete,segment.bytes=1073741824
        Topic: test-topic     Partition: 0    Leader: 1002    Replicas: 1002  Isr: 1002
        Topic: test-topic     Partition: 1    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: test-topic     Partition: 2    Leader: 1003    Replicas: 1003  Isr: 1003
        Topic: test-topic     Partition: 3    Leader: 1004    Replicas: 1004  Isr: 1004

然后这个时候分区的底层文件会先标记为删除,并在稍后进行清理。正常情况下所有的生产者和消费者都不会受到影响而导致中断。

然后我们还可以增加分区的副本数量,同样需要编写文件如下:

{"version":1,
  "partitions":[
     {"topic":"test-topic","partition":0,"replicas":[1002, 1004]},
     {"topic":"test-topic","partition":1,"replicas":[1001, 1002]},
     {"topic":"test-topic","partition":2,"replicas":[1003, 1001]},
     {"topic":"test-topic","partition":3,"replicas":[1004, 1003]}
]}

Kafka 会使用 replicas 下的第一个 broker id 作为 Leader 副本,其余的作为 Follower 副本,所以我们尽量保证现有的 Leader 和变化后的 Leader 一致,这样不会由于 Rebalance 导致消费者阻塞,从而保证服务不中断,如果后期确实分配不均再考虑调整,然后将上面 JSON 保存成文件开始操作:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --reassignment-json-file reassign.json --execute

然后查看进度:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --reassignment-json-file reassign.json --verify

等执行完,再次查看 Topic 详情,确定 Isr 中包含所有的副本就说明已经同步了。

然后在当前状态下,还可以让 Kafka 给我们生成比较好的副本方案,这样比较均衡,我们可以创建一个包含 Topic 的文件:

{
  "topics": [
    {
      "topic": "test-topic"
    }
  ],
  "version": 1
}

这个文件可以指定多个 Topic,然后我们保存文件为:topic-generate.json,并让 Kafka 给我们生成分配方案:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --topics-to-move-json-file topic-generate.json --broker-list "1001,1002,1003,1004" --generate

注意上面 –broker-list 要输入集群所有的 broker id,一定不要漏,然后执行完成 Kafka 就会输出一个基于当前副本数量的分配方案,前面是当前的,后面是 Kafka 为我们生成的,然后我们可以保存后面的 JSON 文件然后执行它即可。

还有一种比较常用的场景是 Kafka 集群添加了一些节点,但是之前的分区没有重新分配,我们这个时候也需要这样来执行重分配,首先也是按照上面的命令指定全部的 broker id,包括新添加的节点,然后生成分配的方案,生成后执行即可。

但是需要注意的是,如果我们原来的分区数量就比较少,比如每个机器只有一个分区,那么这个时候重分配无论如何也无法均衡,我们需要调用 Kafka 命令手动添加分区,然后分区数量足够的条件下再执行重分配即可。当然假如我们开始的分区比较多,那么直接重新分配也是没问题的,并且可以同时调整分区跟副本。另外还有一些技巧,比如尽量保证 Leader replica 前后不变,可以做到尽量不影响生产者和消费者的运行等。