测试环境:三台 kafka 机器连接而成的集群。
主题相关
参数列表:
| 参数 | 描述 |
|---|---|
| –bootstrap-server | 必需使用的参数,表示连接的 Kafka Broker 主机名称和端口号 |
| –topic | 指定操作的 topic 的名称 |
| –create | 创建主题 |
| –delete | 删除主题 |
| –alter | 修改主题 |
| –list | 查看所有主题 |
| –describe | 查看主题详细描述 |
| –partitions | 设置分区数 |
| –replication-factor | 设置分区副本,分区副本的数量不能超过 broker 的数量 |
| –config | 更新系统默认的配置 |
-
创建主题
bashkafka-topics.sh --bootstrap-server "主机名":9092 --topic "topic名称" --create比如创建一个 music 和 sports 主题:
bash[root@centos kafka]# kafka-topics.sh --bootstrap-server kafka1:9092 --topic music --createCreated topic music.[root@centos kafka]# kafka-topics.sh --bootstrap-server kafka1:9092 --topic sports --createCreated topic sports. -
查看所有主题
bash[root@centos kafka]# kafka-topics.sh --bootstrap-server kafka1:9092 --listmusicsports -
查看某个主题详细信息
bashkafka-topics.sh --bootstrap-server "主机名":9092 --topic "topic名称" --describe比如查看 music 主题的详细信息:
bash[root@kafka1 ~]# kafka-topics.sh --bootstrap-server kafka1:9092 --topic music --describeTopic: music TopicId: _8aVphZhQgKuUVsuW7XONQ PartitionCount: 1 ReplicationFactor: 1 Configs: Topic: music Partition: 0 Leader: 3 Replicas: 3 Isr: 3下面是有关信息的详细解读:
- Topic:主题名称,这里是 “music”。
- TopicId:主题的唯一标识符,用于区分不同的主题。
- PartitionCount: 分区数量,这里的值是 1。在 Kafka 中,主题被分割成多个分区,这可以并行处理数据,提高吞吐量。每个分区都是一个有序的、不可变的消息日志。分区数也决定了可以拥有的最大并行消费者数量;每个消费者组中的消费者通常处理一个或多个分区的消息。
- ReplicationFactor: 副本数量,这里的值是 1。这意味着每个分区有 1 个副本,即每个消息只存储一次。副本是分区的一个备份,Kafka 用它来实现高可用性和故障恢复。
- Configs: 主题的配置设置。在这个例子中,没有显示具体的配置,通常这里会列出对此主题进行的特定配置,如消息的保留策略、最大消息大小等。
接下来是关于分区 0 的详细信息:
- Partition:分区号,这里是 0。这是主题内的分区索引,分区号从 0 开始。
- Leader:领导者副本的编号,这里是 3。在 Kafka 的每个分区中,有一个副本被选为领导者。只有领导者副本可以处理对分区的读写请求。其他副本只在后台同步数据。
- Replicas:存储这个分区副本的 BrokerID 列表,这里显示 ID 为 3,这意味着分区 0 的数据只存储在 BrokerID 为 3 的节点上。
- Isr:表示与领导者副本保持同步的副本列表。在这个例子中,只有 BrokerID 为 3 在这里,这意味着 BrokerID 为 3 的节点是领导者副本,且是唯一与领导者保持同步的副本。这是因为,在副本数量为 1 的情况下,领导者也是唯一的同步副本,因为没有其他副本可供同步。
-
修改主题
bashkafka-topics.sh --bootstrap-server "主机名":9092 --topic "topic名称" --alter "要修改的相关配置"从上面可以看出 music 主题的 Partition 的编号为 0,说明只有一个分区。现在修改 “music” 主题的分区数量,并查看该主题的详细信息:
text[root@centos kafka]# kafka-topics.sh --bootstrap-server kafka1:9092 --topic music --alter --partitions 3[root@centos kafka]# kafka-topics.sh --bootstrap-server kafka1:9092 --topic music --describeTopic: music TopicId: _8aVphZhQgKuUVsuW7XONQ PartitionCount: 3 ReplicationFactor: 1 Configs: Topic: music Partition: 0 Leader: 3 Replicas: 3 Isr: 3 Topic: music Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: music Partition: 2 Leader: 2 Replicas: 2 Isr: 2注意:- 一个主题在生成后就不能通过简单命令的方式来改变其副本的数量(需要通过配置文件),所以最好是在主题创建时就指定其副本数量。
- 主题的分区只能增加,不能减少。
下面是一个多副本多分区的例子:
bash[root@kafka1 ~]# kafka-topics.sh --bootstrap-server kafka1:9092 --topic blog --create --replication-factor 3Created topic blog.[root@kafka1 ~]# kafka-topics.sh --bootstrap-server kafka1:9092 --topic blog --describeTopic: blog TopicId: 4TfNTnfcReWD_LsyDJFArQ PartitionCount: 1 ReplicationFactor: 3 Configs: Topic: blog Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1[root@kafka1 ~]# kafka-topics.sh --bootstrap-server kafka1:9092 --topic blog --alter --partitions 3[root@kafka1 ~]# kafka-topics.sh --bootstrap-server kafka1:9092 --topic blog --describeTopic: blog TopicId: 4TfNTnfcReWD_LsyDJFArQ PartitionCount: 3 ReplicationFactor: 3 Configs: Topic: blog Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: blog Partition: 1 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 Topic: blog Partition: 2 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 -
删除主题
bashkafka-topics.sh --bootstrap-server "主机名":9092 --topic "topic名称" --delete现在删除 sports 主题,并列出所有主题:
bash[root@centos kafka]# kafka-topics.sh --bootstrap-server kafka1:9092 --topic sports --delete[root@centos kafka]# kafka-topics.sh --bootstrap-server kafka1:9092 --listblogmusic
生产者和消费者
-
消费者消费一条消息
bashkafka-console-consumer.sh --bootstrap-server "主机名":9092 --topic "topic名称"比如需要消费 music 主题当中的消息:
bash[root@centos kafka]# kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic music -
生产者产生一条消息
bash# 如果消费者没有指定 group id,则会启动一个随机的组kafka-console-producer.sh --bootstrap-server "主机名":9092 --topic "topic名称"比如在 mucis 主题当中生产一条消息:
bash[root@centos kafka]# kafka-console-producer.sh --bootstrap-server kafka1:9092 --topic music>hello kafka现在消费者也能接收到消息:
bash[root@centos kafka]# kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic musichello kafka -
自己指定消费者组的名称
bashkafka-console-consumer.sh --bootstrap-server "主机名":9092 --topic music --group "消费者组名"比如产生一个消费者,组名为:myGroupName
bash[root@kafka1 ~]# kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic music --group myGroupName[root@kafka2 ~]# kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --listmyGroupName -
其它
bash# 从文件中生产消息到 Kafka topic。文件的相对路径是相对于执行当前命令的路径kafka-console-producer.sh --bootstrap-server kafka1:9092 --topic music < topic-input.txt# 生产键值对消息,并以:符号作为分割kafka-console-producer.sh --bootstrap-server kafka1:9092 --topic music \ --property parse.key=true --property key.separator=:>name:bing>age:25# 消费 Kafka topic 中的键值对消息,并同时显示键和值(默认只显示值)kafka-console-consumer.sh --bootstrap-server kafka2:9092 --topic music \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true# 从头消费:Kafka topic中的消息kafka-console-consumer.sh --bootstrap-server kafka2:9092 --topic music --from-beginning
消费者组
-
查看所有消费者组
bashkafka-consumer-groups.sh --bootstrap-server "主机名":9092 --list比如查看所有消费者(不一定是活跃的):
bash[root@kafka1 ~]# kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --listconsole-consumer-49140 -
查看某个消费者组的详情
bashkafka-consumer-groups.sh --bootstrap-server "主机名":9092 --describe --group "消费者组名"
-
删除消费者组
bashkafka-consumer-groups.sh --bootstrap-server "主机名":9092 --delete --group "消费者组名"比如删除消费者组:console-consumer-49140
bash[root@kafka1 ~]# kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --delete --group console-consumer-49140Deletion of requested consumer groups ('console-consumer-49140') was successful.