跳到主要内容

Kafka 简单使用

· 阅读需 4 分钟

省去了一些细节,仅供参考。

安装

  1. 访问 Apache Kafka 官方网站 https://kafka.apache.org/downloads 并下载最新版本并解压。
  2. kafka_2.13-3.5.1\bin\windows 添加到系统变量。

启动

进入 windows 文件夹,分别启动 ZooKeeper 和 Kafka,执行命令:

.\zookeeper-server-start.bat ..\..\config\zookeeper.properties
.\kafka-server-start.bat ..\..\config\server.properties

创建主题

在 Kafka 中,每个 Kafka 实例称为 Broker,每个 Broker 中可以保存多个 Topic。每个 Topic 可以划分为多个分区,每个分区保存的数据是不一样的,这些分区可以在同一个 Broker 中,也可以在散布在不同的 Broker 中。

执行以下命令创建主题:

kafka-topics --bootstrap-server localhost:9092 --topic first_topic --create --partitions 3 --replication-factor 1
  • --bootstrap-server: 配置连接到一个 Kafka 实例;
  • --partitions:分区数量,将该主题划分成多少个分区;
  • --replication-factor:副本数量,表示每个分区一共有多少个副本;副本数量需要小于或等于 Broker 的数量;
  • --topic: 指定创建的主题名

生产者生产消息:

kafka-console-producer --bootstrap-server localhost:9092 --topic first_topic

消息者使用消息,复制标签页,执行命令:

kafka-console-consumer --bootstrap-server localhost:9092 --topic first_topic

查看所有主题:

kafka-topics --bootstrap-server localhost:9092 --list

查看主题的详细信息:

kafka-topics --describe --bootstrap-server localhost:9092 --topic first_topic

消费组

Consumer Groups 是 Kafka 用来组织 Consumer 的一种机制。属于同一 Consumer Group 的 Consumer 会竞争消费同一个 Topic 的消息。每个消息只会被该 Consumer Group 中的一个 Consumer 消费一次。一般来说,一个消费者组的消费者数量跟分区数量一致最好,这样每个消费者可以消费一个分区。过多的消费者会导致部分消费者不能消费消息,过少的消费者会导致单个消费者需要处理多个分区的消息。

对于消费者组来说,我们需要关注以下参数:

  • state:消费者组的状态;
  • members:消费者组成员;
  • offsets: ACK 偏移量;

查看所有的消费者组:

kafka-consumer-groups --bootstrap-server localhost:9092 --list

kafka-console-consumer 从指定主题接收消息,如果不指定 --group,脚本会自动为我们创建一个消费者组:

kafka-console-consumer --bootstrap-server localhost:9092 --topic first_topic --group hello-group

在 C# 中使用 Kafka

文档:https://github.com/confluentinc/confluent-kafka-dotnet

安装客户端:

dotnet add package -v 2.4.0 Confluent.Kafka

在 Go 中使用 Kafka

Unix/Linux 环境下不必说了,主要说下 Windows 环境下的使用。首先需要下载 msys2, 下载安装完成之后,在弹出的终端中安装 mingw-w64-x86_64-toolchain 工具链:

pacman -S --needed base-devel mingw-w64-x86_64-toolchain mingw-w64-ucrt-x86_64-toolchain

--needed:仅安装那些尚未安装的包。如果包已经安装,则跳过,不会重新安装。

全部安装完成之后,把 mingw-w64 的安装目录 C:\msys64\mingw64\bin 加入用户 Path 环境变量里。

然后在项目中引入 confluent-kafka-go:

import "github.com/confluentinc/confluent-kafka-go/v2/kafka"

RabbitMQ

RabbitMO 的整体架构及核心概念:

  • virtual-host: 虚拟主机,起到数据隔离的作用
  • publisher: 消息发送者
  • consumer: 消息的消费者
  • queue: 队列,存储消息,队列仅受主机的内存和磁盘限制,它本质上是一个大型消息缓冲区。
  • exchange: 交换机,负责路由消息到队列

交换机需要绑定队列,才能向队列发送消息。

通过创建用户,可以给用户创建一个新的虚拟主机,不同虚拟主机之间数据是隔离的。