kafka 常见命令

标签:kafka首次发布:2024-04-30最近修改:2024-06-13

测试环境:三台 kafka 机器连接而成的集群。

主题相关

参数列表:

参数 描述
–bootstrap-server 必需使用的参数,表示连接的 Kafka Broker 主机名称和端口号
–topic 指定操作的 topic 的名称
–create 创建主题
–delete 删除主题
–alter 修改主题
–list 查看所有主题
–describe 查看主题详细描述
–partitions 设置分区数
–replication-factor 设置分区副本,分区副本的数量不能超过 broker 的数量
–config 更新系统默认的配置
  1. 创建主题

    bash
    kafka-topics.sh --bootstrap-server "主机名":9092 --topic "topic名称"  --create

    比如创建一个 music 和 sports 主题:

    bash
    [root@centos kafka]# kafka-topics.sh --bootstrap-server kafka1:9092 --topic music --createCreated topic music.[root@centos kafka]# kafka-topics.sh --bootstrap-server kafka1:9092 --topic sports --createCreated topic sports.
  2. 查看所有主题

    bash
    [root@centos kafka]# kafka-topics.sh --bootstrap-server kafka1:9092 --listmusicsports
  3. 查看某个主题详细信息

    bash
    kafka-topics.sh --bootstrap-server "主机名":9092 --topic "topic名称" --describe

    比如查看 music 主题的详细信息:

    bash
    [root@kafka1 ~]# kafka-topics.sh --bootstrap-server kafka1:9092 --topic music --describeTopic: music    TopicId: _8aVphZhQgKuUVsuW7XONQ PartitionCount: 1       ReplicationFactor: 1    Configs:        Topic: music    Partition: 0    Leader: 3       Replicas: 3     Isr: 3

    下面是有关信息的详细解读:

    • Topic:主题名称,这里是 “music”。
    • TopicId:主题的唯一标识符,用于区分不同的主题。
    • PartitionCount: 分区数量,这里的值是 1。在 Kafka 中,主题被分割成多个分区,这可以并行处理数据,提高吞吐量。每个分区都是一个有序的、不可变的消息日志。分区数也决定了可以拥有的最大并行消费者数量;每个消费者组中的消费者通常处理一个或多个分区的消息。
    • ReplicationFactor: 副本数量,这里的值是 1。这意味着每个分区有 1 个副本,即每个消息只存储一次。副本是分区的一个备份,Kafka 用它来实现高可用性和故障恢复。
    • Configs: 主题的配置设置。在这个例子中,没有显示具体的配置,通常这里会列出对此主题进行的特定配置,如消息的保留策略、最大消息大小等。

    接下来是关于分区 0 的详细信息:

    • Partition:分区号,这里是 0。这是主题内的分区索引,分区号从 0 开始。
    • Leader:领导者副本的编号,这里是 3。在 Kafka 的每个分区中,有一个副本被选为领导者。只有领导者副本可以处理对分区的读写请求。其他副本只在后台同步数据。
    • Replicas:存储这个分区副本的 BrokerID 列表,这里显示 ID 为 3,这意味着分区 0 的数据只存储在 BrokerID 为 3 的节点上。
    • Isr:表示与领导者副本保持同步的副本列表。在这个例子中,只有 BrokerID 为 3 在这里,这意味着 BrokerID 为 3 的节点是领导者副本,且是唯一与领导者保持同步的副本。这是因为,在副本数量为 1 的情况下,领导者也是唯一的同步副本,因为没有其他副本可供同步。
  4. 修改主题

    bash
    kafka-topics.sh --bootstrap-server "主机名":9092 --topic "topic名称" --alter "要修改的相关配置"

    从上面可以看出 music 主题的 Partition 的编号为 0,说明只有一个分区。现在修改 “music” 主题的分区数量,并查看该主题的详细信息:

    text
    [root@centos kafka]# kafka-topics.sh --bootstrap-server kafka1:9092 --topic music --alter --partitions 3[root@centos kafka]# kafka-topics.sh --bootstrap-server kafka1:9092 --topic music --describeTopic: music    TopicId: _8aVphZhQgKuUVsuW7XONQ PartitionCount: 3       ReplicationFactor: 1    Configs:        Topic: music    Partition: 0    Leader: 3       Replicas: 3     Isr: 3        Topic: music    Partition: 1    Leader: 1       Replicas: 1     Isr: 1        Topic: music    Partition: 2    Leader: 2       Replicas: 2     Isr: 2

    注意

    • 一个主题在生成后就不能通过简单命令的方式来改变其副本的数量(需要通过配置文件),所以最好是在主题创建时就指定其副本数量。
    • 主题的分区只能增加,不能减少。

    下面是一个多副本多分区的例子:

    bash
    [root@kafka1 ~]# kafka-topics.sh --bootstrap-server kafka1:9092 --topic blog --create --replication-factor 3Created topic blog.[root@kafka1 ~]# kafka-topics.sh --bootstrap-server kafka1:9092 --topic blog --describeTopic: blog     TopicId: 4TfNTnfcReWD_LsyDJFArQ PartitionCount: 1       ReplicationFactor: 3    Configs:        Topic: blog     Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1[root@kafka1 ~]# kafka-topics.sh --bootstrap-server kafka1:9092 --topic blog --alter --partitions 3[root@kafka1 ~]# kafka-topics.sh --bootstrap-server kafka1:9092 --topic blog --describeTopic: blog     TopicId: 4TfNTnfcReWD_LsyDJFArQ PartitionCount: 3       ReplicationFactor: 3    Configs:        Topic: blog     Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1        Topic: blog     Partition: 1    Leader: 3       Replicas: 3,2,1 Isr: 3,2,1        Topic: blog     Partition: 2    Leader: 1       Replicas: 1,3,2 Isr: 1,3,2
  5. 删除主题

    bash
    kafka-topics.sh --bootstrap-server "主机名":9092 --topic "topic名称" --delete

    现在删除 sports 主题,并列出所有主题:

    bash
    [root@centos kafka]# kafka-topics.sh --bootstrap-server kafka1:9092 --topic sports --delete[root@centos kafka]# kafka-topics.sh --bootstrap-server kafka1:9092 --listblogmusic

生产者和消费者

  1. 消费者消费一条消息

    bash
    kafka-console-consumer.sh --bootstrap-server "主机名":9092 --topic "topic名称"

    比如需要消费 music 主题当中的消息:

    bash
    [root@centos kafka]# kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic music
  2. 生产者产生一条消息

    bash
    # 如果消费者没有指定 group id,则会启动一个随机的组kafka-console-producer.sh --bootstrap-server "主机名":9092 --topic "topic名称"

    比如在 mucis 主题当中生产一条消息:

    bash
    [root@centos kafka]# kafka-console-producer.sh --bootstrap-server kafka1:9092 --topic music>hello kafka

    现在消费者也能接收到消息:

    bash
    [root@centos kafka]# kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic musichello kafka
  3. 自己指定消费者组的名称

    bash
    kafka-console-consumer.sh --bootstrap-server "主机名":9092 --topic music --group "消费者组名"

    比如产生一个消费者,组名为:myGroupName

    bash
    [root@kafka1 ~]# kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic music --group myGroupName[root@kafka2 ~]# kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --listmyGroupName
  4. 其它

    bash
    # 从文件中生产消息到 Kafka topic。文件的相对路径是相对于执行当前命令的路径kafka-console-producer.sh --bootstrap-server kafka1:9092 --topic music < topic-input.txt# 生产键值对消息,并以:符号作为分割kafka-console-producer.sh --bootstrap-server kafka1:9092 --topic music \	--property parse.key=true --property key.separator=:>name:bing>age:25# 消费 Kafka topic 中的键值对消息,并同时显示键和值(默认只显示值)kafka-console-consumer.sh --bootstrap-server kafka2:9092 --topic music \	--formatter kafka.tools.DefaultMessageFormatter \	--property print.key=true \	--property print.value=true# 从头消费:Kafka topic中的消息kafka-console-consumer.sh --bootstrap-server kafka2:9092 --topic music --from-beginning

消费者组

  1. 查看所有消费者组

    bash
    kafka-consumer-groups.sh --bootstrap-server "主机名":9092 --list

    比如查看所有消费者(不一定是活跃的):

    bash
    [root@kafka1 ~]# kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --listconsole-consumer-49140
  2. 查看某个消费者组的详情

    bash
    kafka-consumer-groups.sh --bootstrap-server "主机名":9092 --describe --group "消费者组名"

    image-20240430164345319

  3. 删除消费者组

    bash
    kafka-consumer-groups.sh --bootstrap-server "主机名":9092 --delete --group "消费者组名"

    比如删除消费者组:console-consumer-49140

    bash
    [root@kafka1 ~]# kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --delete --group console-consumer-49140Deletion of requested consumer groups ('console-consumer-49140') was successful.