初识 kafka 消息队列

标签:kafka首次发布:2024-04-30最近修改:2024-06-14

消息队列使用场景

1. 流量削锋

由于前端和后端的流量处理能力不一样,如何避免过多的请求压垮秒杀系统呢?

解决思路:使用消息队列隔离网关后端服务,以达到流量控制和保护后端服务的目的。

image-20231224143008380

假如消息队列之后,整个秒杀的流程:

  • 网关收到请求后,将请求放入到 MQ 中。
  • 后端服务从请求 MQ 获取请求,完成后续秒杀处理过程,返回响应。

image-20231224143100363

优点:

  1. 流量平滑:消息队列可以缓冲瞬时的高流量,使后端处理更平稳。
  2. 异步处理:允许系统异步处理请求,提高用户体验,避免直接等待。
  3. 系统解耦:消息队列降低了系统组件之间的耦合度,易于维护和扩展。

缺点:

  1. 增加复杂性:引入消息队列会增加系统架构的复杂性。
  2. 处理延时:消息队列引入了额外的处理延时,可能影响实时性。
  3. 系统监控:需要额外的监控和维护消息队列的健康状况。

2. 异步处理

在新的用户注册时,需要将用户的信息保存到数据库中,同时还需要额外发送注册的邮件通知、以及短信注册码给用户。但因为发送邮件、发送注册短信需要连接外部的服务器,需要额外等待一段时间,此时,就可以使用消息队列来进行异步处理,从而实现快速响应。

image-20231224150156923

3. 服务解耦

如果库存系统出现问题,会导致订单系统下单失败。而且如果库存系统接口修改了,会导致订单系统也无法工作。

使用消息队列可以实现系统与系统之间的解耦,订单系统不再调用库存系统接口,而是把订单消息写入到消息队列。库存系统从消息系统中拉取消息,然后再减库存,从而实现系统解耦。

image-20231224150357142

两种消息队列模型

P2P 模型

点对点(Point-to-Point)模型当中,有如下特点:

  • 每个消息只有一个消费者(即消息一旦被消费,就从队列中移除);如果有多个消费者,消息队列将以某种方式(如轮询或优先级)将消息分发给它们,但每个消息仍然只被处理一次。
  • 接收者在成功接收消息并且完成消费后,必需向队列发送一个应答消息以确认成功处理了消息。消息应答的作用是告知消息队列系统消息已经被成功处理,可以从队列中移除。
  • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列。

适用场景:

  • 需要确保消息顺序处理的场合。
  • 严格的消息处理,确保消息不会丢失,每个消息只被处理一次。
  • 系统间的点对点或请求/响应模式的通信。

PS 模型

发布/订阅(Publish/Subscribe)模型当中,有如下特点:

  • 消息被发送到一个主题,可以有多个消费者订阅一个主题,当消息发送到主题时,所有订阅者都会收到消息(即一对多的通信)。
  • 发布者和订阅者之间有时间上的依赖性。针对某个消费者,它必须订阅某个主题后才能消费发布者发布的相应主题的消息。

适用场景:

  • 适合事件通知和更新,比如股票价格更新或新闻报道。
  • 当消息需要广播到多个消费者时。
  • 适合松耦合的系统架构,生产者不需要知道谁是消息的消费者。

安装 kafka 消息队列(单机版)

1. 安装 zookeeper

官网:zookeeper (apache.org)。这里选择当前最新的 3.9.2 版本(zookeeper-3.9.2)下载即可。命令如下:

bash
# 下载wget https://archive.apache.org/dist/zookeeper/zookeeper-3.9.2/apache-zookeeper-3.9.2-bin.tar.gz# 解压tar -xvzf apache-zookeeper-3.9.2-bin.tar.gz -C /usr/local/mv apache-zookeeper-3.9.2-bin/ zookeeper/

设置环境变量

bash
vim /etc/profile# 添加如下内容export ZOOKEEPER_HOME=/usr/local/zookeeperexport PATH=$PATH:$ZOOKEEPER_HOME/bin# 重新编译使环境变量生效source /etc/profile

修改配置文件:

bash
# 切换到conf目录下cd /usr/local/zookeeper/conf# 复制zoo_sample.cfg到zoo.cfgcp zoo_sample.cfg zoo.cfg# 打开zoo.cfgvim zoo.cfg# 修改配置如下:# 存放数据dataDir=/usr/local/zookeeper/data# 存放日志文件dataLogDir=/usr/local/zookeeper/log

启动

bash
# 启动命令zkServer.sh start# 停止命令zkServer.sh stop# 查看状态zkServer.sh status

设置为系统服务并使用开机自启动:

  1. 编辑 zookeeper.service 文件

    text
    vim /usr/lib/systemd/system/zookeeper.service
  2. 内容如下:

    bash
    [Unit]Description=zookeeperAfter=network.target remote-fs.target nss-lookup.target[Service]Type=forkingExecStart=/usr/local/zookeeper/bin/zkServer.sh startExecReload=/usr/local/zookeeper/bin/zkServer.sh restartExecStop=/usr/local/zookeeper/bin/zkServer.sh stop[Install]WantedBy=multi-user.target
  3. 重新加载 Systemd 配置:

    text
    systemctl daemon-reload
  4. 启动和开机自启动:

    text
    # 设置开机自启动systemctl enable zookeeper# 启动zookeepersystemctl start zookeeper

2. 安装 kafka

这里安装最新版:kafka_2.13-3.6.2.tgz。命令如下:

bash
# 下载wget https://downloads.apache.org/kafka/3.6.2/kafka_2.13-3.6.2.tgz# 解压tar -xvzf kafka_2.13-3.6.2.tgz -C /usr/localmv /usr/local/kafka_2.13-3.6.2 /usr/local/kafka

3. 配置环境变量

bash
vim /etc/profile# 加入如下内容:export KAFKA_HOME=/usr/local/kafkaexport PATH=$PATH:$KAFKA_HOME/binsource /etc/profile

4. 修改配置文件

text
cd /usr/local/kafka/configvim server.properties# 内容修改如下:broker.id=0log.dirs=/usr/local/kafka/logszookeeper.connect=localhost:2181

server.properties是 Kafka Broker 的主要配置文件,包含了 Kafka Broker 的各种配置选项,如监听端口、日志目录、副本数等。

  1. broker.id=0:
    • 这个配置项定义了 Kafka Broker 的唯一标识符。每个 Kafka Broker 都必须具有唯一的 ID,以便集群中的其他 Broker 和客户端能够识别它。
    • broker.id 是必须指定的配置项,无论是在单机环境还是多节点集群环境中。
  2. log.dirs=/usr/local/kafka/logs:
    • 这个配置项定义了 Kafka Broker 用于存储消息日志的目录路径。Kafka 使用消息日志来持久化消息,以确保消息的持久性和可靠性。
    • 在这个配置项中,可以指定一个或多个目录,多个目录之间使用逗号分隔。如果指定了多个目录,Kafka Broker 将会在这些目录中以循环方式写入消息,以提高性能和分布式存储。
  3. zookeeper.connect=192.168.11.10:2181:
    • 这个配置项定义了 Kafka Broker 用于连接 ZooKeeper 服务的地址。ZooKeeper 是 Kafka 用于集群协调和元数据管理的关键组件。

5. 启动和停止

bash
# 启动kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties# 查看是否启动成功jps# 停止kafka-server-stop.sh

kafka 基本概念和架构

整体架构

  • Broker(服务代理节点):在 Kafka 集群中,一个 Kafka 服务器就是一个 Broker。

  • Producer(生产者):将事件发布(写入)到 Kafka 的客户端应用程序,根据某种规则向 Kafka 的一个 Topic 的特定分区的 Leader 副本发布消息。

  • Consumer(消费者):订阅(读取和处理)这些消息的客户端应用程序,消费或订阅某个 Topic 主题里的消息。消费者只需订阅 Topic,无需关注 Kafka 集群内实例对 Partition 的分配。

image-20240427191321622

主题和分区

  • Record(消息):Kafka 中的元数据或者说一条消息的示例结构如下:

    • key(键): “Alice”
    • value(值): “Made a payment of $200 to Bob”
    • timestamp(时间戳): “Jun. 25, 2020 at 2:06 p.m.”
  • Topic(主题):Kafka 中的消息以 topic 为单位进行归类,生产者负责将消息发送到特定的 topic,而消费者负责订阅 topic 并进行消费。

  • Partition(分区):topic 是一个逻辑上的概念,它还可以细分为多个分区。每个分区都是一个独立的日志文件,用于存储该分区内的消息。因此,同一主题下的不同分区包含的消息是不同的。

  • offset(偏移量):消息在被追加到分区日志文件的时候都会分配一个特定的 offset。offset 是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区内部有序而不是主题有序。

主题和分区的示意图如下:

image-20240427193637753

消费者组

  • Consumer Group(消费者组)
    • 每个 Consumer 属于一个特定的 Consumer Group,多个 Consumer 可以属于同一个 Consumer Group。
    • kafka 消息存放在 Partition 的日志中,同一条消息可以被不同消费者组消费。
    • 但在同一个消费者组中,Partition 内的消息只由组内的某一个消费者实例绑定消费。
    • 当消费者组中消费者实例初次连接 Kafka 时,分配 Partition,Consumer 向 Kafka 发送心跳检测,后续超过心跳周期将默认离线。会触发 rebalance,将该 Partition 重新分配给消费者组中的其他消费者实例。

消费者组、主题和分区的关系如下:

image-20240427194448582

多副本

  • Replication(副本):每个 Partition 分区可以有多个副本,分布在不同的 Broker 上。副本之间是一主多从的关系。

  • Leader(领导者):每个 Partition 有多个副本,其中有且仅有一个作为 Leader,Leader 是当前负责数据的读写的 Partition。

  • Follower(跟随者):Follower 跟随 Leader,所有写请求都通过 Leader 路由,数据变更会广播给所有 Follower,Follower 与 Leader 保持数据同步。如果 Leader 失效,则从 Follower 中选举出一个新的 Leader。

服务代理节点、主题、分区和副本之间的关系如下:

image-20240427195427060