sss

Kafka介绍

Quber...大约 27 分钟研发后端Kafka

Kafka 说明

本章,我们主要从Kafka简介Kafka安装Zookeeper配置Kafka配置配置主题用户权限演示动态增加用户常用命令这几个方面对Kafka进行介绍!

提示

本章的内容,我们是基于Kafka-3.5.1版本进行安装和配置的,因此,特别需要注意一定要按照本文说明进行安装和配置。

因为在Kafka2.8+版本以后,各种配置的方式有所改变!!!

1、🍇Kafka 简介

1.1、介绍

官网地址:https://kafka.apache.org/open in new window

官网文档:https://kafka.apache.org/documentation/open in new window

Kafka 是最初由 Linkedin 公司开发,是一个分布式、分区的、多副本的、多订阅者,基于 zookeeper 协调的分布式日志系统(也可以当做 MQ 系统),常见可以用于 Web/Nginx 日志、访问日志,消息服务等等,Linkedin 于 2010 年贡献给了 Apache 基金会并成为顶级开源项目。

1.2、使用场景

  1. 日志收集:可以用 kafka 收集各种服务的日志 ,通过已统一接口的形式开放给各种消费者;

  2. 消息系统:解耦生产和消费者,缓存消息;

  3. 用户活动追踪:kafka 可以记录 webapp 或 app 用户的各种活动,如浏览网页,点击等活动,这些活动可以发送到 kafka,然后订阅者通过订阅这些消息来做监控;

  4. 运营指标:可以用于监控各种数据。

1.3、设计目标

  1. 以时间复杂度为 O(1)的方式提供消息持久化能力,即使对 TB 级以上数据也能保证常数时间的访问性能;

  2. 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条消息的传输;

  3. 支持 Kafka Server 间的消息分区,及分布式消费,同时保证每个 partition 内的消息顺序传输;

  4. 同时支持离线数据处理和实时数据处理;

  5. 支持在线水平扩展。

1.4、特点

  1. 解耦。Kafka 具备消息系统的优点,只要生产者和消费者数据两端遵循接口约束,就可以自行扩展或修改数据处理的业务过程;

  2. 高吞吐量、低延迟。即使在非常廉价的机器上,Kafka 也能做到每秒处理几十万条消息,而它的延迟最低只有几毫秒;

  3. 持久性。Kafka 可以将消息直接持久化在普通磁盘上,且磁盘读写性能优异;

  4. Kafka 集群支持热扩展,Kaka 集群启动运行后,用户可以直接向集群添;

  5. 容错性。Kafka 会将数据备份到多台服务器节点中,即使 Kafka 集群中的某一台加新的 Kafka 服务节点宕机,也不会影响整个系统的功能;

  6. 支持多种客户端语言。Kafka 支持 Java、.NET、PHP、Python 等多种语言;

  7. 支持多生产者和多消费者。

1.5、基本概念

名称说明
broker消息中间件处理节点,一个 broker 就是一个 kafka 节点,多个 broker 构成一个 kafka 集群
topickafka 根据消息进行分类,发布到 kafka 的每个消息都有一个对应的 topic
producer消息生产(发布)者
consumer消息消费(订阅)者
consumergroup消息订阅集群,一个消息可以被多个 consumergroup 消费,但是一个 consumergroup 只有一个 consumer 可以消费消息

2、🍈Kafka 安装

以下我们主要介绍 Kafka 在 Windows 环境中的安装过程。

2.1、下载

官网下载地址:https://kafka.apache.org/downloadsopen in new window

或者清华镜像:https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/open in new window,建议在此镜像中去下载,因为速度快!!!

如我们下载:https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.5.1/kafka_2.13-3.5.1.tgzopen in new window

2.2、安装

我们将下载下来的压缩文件kafka_2.13-3.5.1.tgz解压到需要安装的目录,如:D:\Net_Program\Net_Kafka,具体的解压文件如下图所示:

解压
解压

说明

Kafka2.8+已经内置了 Zookeeper,因此,我们可以使用内置的 Zookeeper 即可。


3、🍑Zookeeper 配置

3.1、修改 zookeeper.properties

我们在 config 文件夹下找到zookeeper.properties文件,编辑该文件,修改dataDir配置和增加dataLogDir配置,dataDir用于存放数据,dataLogDir用于存放日志,具体配置如下所示:

# 数据存放目录
dataDir=D:\\Net_Program\\Net_Kafka\\data-zoo-data
# 日志存放目录,为了更好的性能,通常将日志文件挂载在单独的磁盘分区
dataLogDir=D:\\Net_Program\\Net_Kafka\\data-zoo-log
完整配置
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
# dataDir=/tmp/zookeeper
# 数据存放目录
dataDir=D:\\Net_Program\\Net_Kafka\\data-zoo-data
# 日志存放目录,为了更好的性能,通常将日志文件挂载在单独的磁盘分区
dataLogDir=D:\\Net_Program\\Net_Kafka\\data-zoo-log
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost=0.0.0.0
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
配置
配置

3.2、运行 Zookeeper

在 Kafka 的安装目录D:\Net_Program\Net_Kafka按住 Shift+鼠标右键,选择在此处打开 Powershell 窗口(S),然后执行如下命令:

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
运行Zookeeper
运行Zookeeper

上述命令没有报错则证明 Zookeeper 运行成功!

如下所示为验证 Zookeeper 的端口号2181是否为启动状态,open表示启动状态:

Zookeeper端口检测
Zookeeper端口检测

3.3、将 Zookeeper 作为 Windows 服务运行

此处我们使用NSSM作为服务的安装工具。

定位到NSSM工具所在的目录,并先按住Shift建,然后点击鼠标右键,选择在此处打开 Powershell 窗口(S),如下图所示:

NSSM
NSSM

然后我们输入如下命令调出设置窗口:

.\nssm install
NSSM
NSSM

然后在弹出窗口中进行如下配置:

  • Application 选项卡(必填):

    • Path:选择zookeeper-server-start.bat文件所在地址,完整地址为D:\Net_Program\Net_Kafka\bin\windows\zookeeper-server-start.bat

    • Startup directory:在Path选择后会自动填充(此项不用管);

    • Arguments:为启动服务的参数,此处填入D:\Net_Program\Net_Kafka\config\zookeeper.properties

    • Service name:填写服务的名称,如:Kafka-Zookeeper-Service

  • Details(可选):

    • Display name:服务显示名称,如:Kafka-Zookeeper-Service;

    • Description:服务描述内容,如:Kafka 依赖的 Zookeeper 服务!!!。

最后我们点击Install service按钮即可。

NSSM
NSSM
NSSM
NSSM
NSSM
NSSM

此时,我们打开 Windows 服务即可看到多了一个名称为Kafka-Zookeeper-Service的服务了。

这时候,我们可以将3.2 运行 Zookeeper中运行的 PowerShell 窗口关闭了,然后运行该服务,如下图所示:

NSSM
NSSM

3.4、创建 SCRAM 凭证

Kafka 中的 SCRAM 实现使用 Zookeeper 作为凭证存储。

使用 kafka-configs.bat 在 zookeeper 中创建凭证,对于启用的每个 SCRAM 机制,必须通过添加具有机制名称的配置来创建凭据。

必须在启动 Kafka 代理之前创建代理间通信的凭证,否则 kafka 服务启动会报错。

可以动态创建和更新客户端凭证,更新后的凭证将用于验证新连接。

  1. 创建用户

    在 Kafka 安装目录再新开一个 PowerShell 窗口,然后分别执行如下命令:

    # 创建管理员用户:admin
    .\bin\windows\kafka-configs.bat --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name admin
    
    # 创建读和写权限用户:quber
    .\bin\windows\kafka-configs.bat --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name quber
    
    # 创建读权限用户:quber1
    .\bin\windows\kafka-configs.bat --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name quber1
    
    创建3个用户
    创建3个用户

    说明:

    • 上述命令创建了 3 个用户,分别是 admin、quber、quber1;

    • admin:该用户主要用于超级管理员身份,后续配置会用到;

    • quber:该用户主要用于客户端生产和消费,同时拥有读和写的权限(此处还没设置用户的读写权限,后续会设置);

    • quber1:该用户主要用于客户端消费,只拥有的权限(此处还没设置用户的读权限,后续会设置)。

  2. 查看凭证

    使用如下命令可查看所有用户凭证:

    .\bin\windows\kafka-configs.bat --zookeeper localhost:2181 --describe --entity-type users
    
    查看所有用户凭证
    查看所有用户凭证

    使用如下命令可查看单个用户凭证:

    .\bin\windows\kafka-configs.bat --zookeeper localhost:2181 --describe --entity-type users --entity-name admin
    
    查看单个用户凭证
    查看单个用户凭证
  3. 删除凭证

    使用如下命令可删除某个用户凭证:

    .\bin\windows\kafka-configs.bat --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name quber
    
    删除某个用户凭证
    删除某个用户凭证

4、🍒Kafka 配置

4.1、修改 server.properties

我们在 config 文件夹下找到server.properties文件,编辑该文件,在该文件最后加入如下配置:

# SASL身份认证
# 内网IP
listeners=SASL_PLAINTEXT://192.168.20.31:9092
# 外网IP
advertised.listeners=SASL_PLAINTEXT://192.168.20.31:9092
# 使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT
# SASL机制
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# 指定认证机制类  注意:kafka低版本使用 kafka.security.auth.SimpleAclAuthorizer,具体查阅官方文档
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# 如果没有找到ACL(访问控制列表)配置,则允许任何操作。
allow.everyone.if.no.acl.found=false
# auto.create.topics.enable=true
# delete.topic.enable=true
# 超级管理员权限用户
super.users=User:admin

提示

上述配置中,指定认证机制类请使用kafka.security.authorizer.AclAuthorizer,低版本使用kafka.security.auth.SimpleAclAuthorizer

super.users=User:后面的admin就是之前我们创建的超级管理员。

完整配置如下所示:

server.properties 完整配置
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#
# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
# See kafka.server.KafkaConfig for additional details and defaults
#

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
# broker的全局唯一编号,不能重复
broker.id=0

# 用来监听链接的端口,producer或consumer将在此端口建立连接
port=9092

############################# Socket Server Settings #############################

# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
# 处理网络请求的线程数量,默认为3
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
# 用来处理磁盘IO的线程数量,默认为8
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
# 发送套接字的缓冲区大小,默认为102400
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
# 接受套接字的缓冲区大小,默认为102400
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
# 请求套接字的缓冲区大小,默认为104857600
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
# Kafka日志存放的路径
log.dirs=D:/Net_Program/Net_Kafka/data-kafka-log

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
# topic在当前broker上的分片个数,默认为1
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
# 用来恢复和清理data下数据的线程数量,默认为1
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
# partion buffer中,消息的条数达到阈值,将触发flush到磁盘
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
# 消息buffer的时间,达到阈值,将触发flush到磁盘
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
# 默认消息的最大持久化时间,720小时,30天,默认为168
log.retention.hours=720

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.segment.bytes=1073741824
# 日志文件中每个segment的大小,默认为1G
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
# 周期性检查文件大小的时间,默认为300000
log.retention.check.interval.ms=300000

# 日志清理是否打开,一般不用启用,启用的话可以提高性能
log.cleaner.enable=false

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
# zookeeper集群的地址,可以是多个,多个之间用逗号分割hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
# zookeeper链接超时时间,默认为18000
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

############################# 其他 #############################

# 消息保存的最大值20M
message.max.byte=20971520

# kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
default.replication.factor=2

# 取消息的最大直接数
replica.fetch.max.bytes=5242880

# SASL身份认证
# 内网IP
listeners=SASL_PLAINTEXT://192.168.20.31:9092
# 外网IP
advertised.listeners=SASL_PLAINTEXT://192.168.20.31:9092
# 使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT
# SASL机制
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# 指定认证机制类  注意:kafka低版本使用 kafka.security.auth.SimpleAclAuthorizer,具体查阅官方文档
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# 如果没有找到ACL(访问控制列表)配置,则允许任何操作。
allow.everyone.if.no.acl.found=false
# auto.create.topics.enable=true
# delete.topic.enable=true
# 超级管理员权限用户
super.users=User:admin

4.2、配置 kafka_server_jaas.conf

我们在 config 文件夹下新建kafka_server_jaas.conf文件,并将如下内容填入其中:

KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required
	username="admin"
	password="123456";
};

4.3、修改 kafka-server-start.bat

我们在 bin\windows 文件夹下找到kafka-server-start.bat文件,编辑该文件,在"%~dp0kafka-run-class.bat" kafka.Kafka %*的上一行增加如下配置:

set KAFKA_OPTS=-Djava.security.auth.login.config=file:%~dp0../../config/kafka_server_jaas.conf
修改kafka-server-start.bat
修改kafka-server-start.bat

4.4、运行 Kafka

在 Kafka 的安装目录D:\Net_Program\Net_Kafka按住 Shift+鼠标右键,选择在此处打开 Powershell 窗口(S),然后执行如下命令:

.\bin\windows\kafka-server-start.bat .\config\server.properties
运行Kafka
运行Kafka

上述命令没有报错则证明 Kafka 运行成功!

如下所示为验证 Kafka 的端口号9092是否为启动状态,open表示启动状态:

Kafka端口检测
Kafka端口检测

4.5、将 Kafka 作为 Windows 服务运行

此处我们使用NSSM作为服务的安装工具。

定位到 NSSM 工具所在的目录,并先按住 Shift 建,然后点击鼠标右键,选择在此处打开 Powershell 窗口(S),如下图所示:

NSSM
NSSM

然后我们输入如下命令调出设置窗口:

.\nssm install
NSSM
NSSM

然后在弹出窗口中进行如下配置:

  • Application 选项卡(必填):

    • Path:选择kafka-server-start.bat文件所在地址,完整地址为D:\Net_Program\Net_Kafka\bin\windows\kafka-server-start.bat

    • Startup directory:在Path选择后会自动填充(此项不用管);

    • Arguments:填写server.properties文件所在地址,完整地址为D:\Net_Program\Net_Kafka\config\server.properties

    • Service name:填写服务的名称,如:Kafka-Service

  • Details(可选):

    • Display name:服务显示名称,如:Kafka-Service;

    • Description:服务描述内容,如:Kafka 服务!!!。

最后我们点击Install service按钮即可。

NSSM
NSSM
NSSM
NSSM
NSSM
NSSM

此时,我们打开 Windows 服务即可看到多了一个名称为Kafka-Service的服务了。

这时候,我们可以将4.4 运行 Kafka中运行的 PowerShell 窗口关闭了,然后运行该服务,如下图所示:

NSSM
NSSM

5、🍓 配置主题

5.1、配置 adminclient-configs.conf

我们在 config 文件夹下新建adminclient-configs.conf文件,,并将如下内容填入其中:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
# 用户名密码配置为上述步骤创建的管理员账户
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="123456";

提示

上述配置中,admin就是之前我们创建的超级管理员。

5.2、查看所有主题

在 PowerShell 中执行如下命令可查看所有主题:

.\bin\windows\kafka-topics.bat --bootstrap-server 192.168.20.31:9092 --list --command-config .\config\adminclient-configs.conf
查看所有主题
查看所有主题

注意

命令中用到的 IP 地址需要和server.properties文件中listeners配置的 IP 保持一致,否则连接不上,后续其他 Kafka 命令也是如此!!!

5.3、创建主题

在 PowerShell 中执行如下命令可创建某个主题:

.\bin\windows\kafka-topics.bat --bootstrap-server 192.168.20.31:9092 --create --replication-factor 1 --partitions 3 --topic T1 --command-config .\config\adminclient-configs.conf

上述命令为创建主题T1指定分区数为 3,副本数为 1),如下图所示:

创建主题
创建主题

5.4、查看某个主题

在 PowerShell 中执行如下命令可查看某个主题:

.\bin\windows\kafka-topics.bat --bootstrap-server 192.168.20.31:9092 --describe --topic T1 --command-config .\config\adminclient-configs.conf
查看某个主题
查看某个主题

5.5、修改某个主题

在 PowerShell 中执行如下命令可修改某个主题:

.\bin\windows\kafka-topics.bat --bootstrap-server 192.168.20.31:9092 --alter --topic T1 --partitions 4 --command-config .\config\adminclient-configs.conf

上述命令为将主题T1指定分区数修改为 4,如下图所示:

修改某个主题
修改某个主题

6、🍍 用户权限

还记得在之前我们创建了 3 个用户,分别是 admin、quber、quber1 吗?

其中 admin 为超级管理员用户,quber 和 quber1 位客户端用户,其中 quber 我们需要给他设置读和写的权限,quber1 我们需要给他设置读的权限,方便后面演示。

6.1、配置用户读写权限

在 PowerShell 中执行如下命令:

# 给用户quber赋予读和写权限(主题为T1)
.\bin\windows\kafka-acls.bat --bootstrap-server 192.168.20.31:9092 --add --allow-principal User:quber --operation Read --operation Write --topic T1 --command-config .\config\adminclient-configs.conf

# 给用户quber赋予读权限所属的分组(分组为G1)
.\bin\windows\kafka-acls.bat --bootstrap-server 192.168.20.31:9092 --add --allow-principal User:quber --operation Read --group G1 --command-config .\config\adminclient-configs.conf

上述命令代表,给用户 quber 赋予读和写的权限,其中主题为 T1,分组为 G1,如下图所示:

配置用户读写权限
配置用户读写权限

6.2、配置用户读权限

在 PowerShell 中执行如下命令:

# 给用户quber1赋予读权限(主题为T1,分组为G1)
.\bin\windows\kafka-acls.bat --bootstrap-server 192.168.20.31:9092 --add --allow-principal User:quber1 --operation Read --topic T1 --group G1 --command-config .\config\adminclient-configs.conf

上述命令代表,给用户 quber1 赋予读的权限,其中主题为 T1,分组为 G1,如下图所示:

配置用户读权限
配置用户读权限

6.3、查看用户权限

在 PowerShell 中执行如下命令:

.\bin\windows\kafka-acls.bat --authorizer-properties zookeeper.connect=192.168.20.31:2181 --list

上述命令代表,获取所有用户权限,如下图所示:

查看用户权限
查看用户权限

从上图我们可以看到,quber 可以写和读主题 T1,quber1 只能读主题 T1,同时针对消费权限,用户 quber 和 quber1 都拥有分组 G1!


7、🌽 演示(Kafka)

以下演示我们使用的是 Kafka 安装包中自带的生产者(kafka-console-producer.bat)和消费者(kafka-console-consumer.bat)工具。

7.1、修改 producer.properties

在 config 文件夹中,我们在producer.properties文件最后加入如下内容:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="quber" password="123456";

上述配置中的username="quber"quber就是我们创建的拥有读和写权限的用户!!!

7.2、修改 consumer.properties

在 config 文件夹中,我们在consumer.properties文件最后加入如下内容:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="quber1" password="123456";

同时,我们需要将consumer.properties文件中的group.id修改为G1,因为之前我们是将quber1 用户分配给了 G1 这个分组

consumer.properties 文件的完整配置
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092

# consumer group id
group.id=G1

# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
#auto.offset.reset=

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="quber1" password="123456";

上述配置中的username="quber1"quber1就是我们创建的拥有读权限的用户!!!

7.3、启动生产者

我们可以使用 Kafka 安装包中的kafka-console-producer.bat文件来作为生产者进行演示。

在 PowerShell 中执行如下命令:

.\bin\windows\kafka-console-producer.bat --bootstrap-server 192.168.20.31:9092 --topic T1 --producer.config .\config\producer.properties
启动生产者
启动生产者

7.4、启动消费者

我们可以使用 Kafka 安装包中的kafka-console-consumer.bat文件来作为消费者进行演示。

在 PowerShell 中执行如下命令:

.\bin\windows\kafka-console-consumer.bat --bootstrap-server 192.168.20.31:9092 --topic T1 --consumer.config .\config\consumer.properties
启动消费者
启动消费者

7.5、最终效果

我们在生产者窗口输入内容,即可在消费者窗口看到消费的消息,具体如下图所示:

最终效果
最终效果

8、🍄 演示(.Net6)

以下演示我们将使用.Net6 项目进行演示,用到的组件为Confluent.Kafkaopen in new window

8.1、新建.Net6 生产者和消费者项目

我们分别新建基于.Net6 的生产者和消费者项目,此处我们新建控制台项目,方便演示,如下图所示我们新建好的项目:

.Net6项目
.Net6项目

8.2、获取 Confluent.Kafka 组件包

我们分别在这 2 个项目中安装获取Confluent.Kafka组件包。

8.3、拷贝演示代码

我们在Confluent.Kafka 组件包开源地址open in new window中,直接将生产者和消费者的代码拷贝过来。

  1. 生产者代码
Program.cs
// See https://aka.ms/new-console-template for more information
//Console.WriteLine("Hello, World!");

using Confluent.Kafka;

//Kafka地址和主题
string brokerList = "192.168.20.31:9092";
string topicName = "T1";

var config = new ProducerConfig
{
    BootstrapServers = brokerList,
    SaslMechanism = SaslMechanism.ScramSha256,
    SecurityProtocol = SecurityProtocol.SaslPlaintext,
    ReconnectBackoffMaxMs = 60000,
    SaslUsername = "quber",
    SaslPassword = "123456",
};

using (var producer = new ProducerBuilder<string, string>(config).Build())
{
    Console.WriteLine("\n-----------------------------------------------------------------------");
    Console.WriteLine($"Producer {producer.Name} producing on topic {topicName}.");
    Console.WriteLine("-----------------------------------------------------------------------");
    Console.WriteLine("To create a kafka message with UTF-8 encoded key and value:");
    Console.WriteLine("> key value<Enter>");
    Console.WriteLine("To create a kafka message with a null key and UTF-8 encoded value:");
    Console.WriteLine("> value<enter>");
    Console.WriteLine("Ctrl-C to quit.\n");

    var cancelled = false;
    Console.CancelKeyPress += (_, e) => {
        e.Cancel = true; // prevent the process from terminating.
        cancelled = true;
    };

    while (!cancelled)
    {
        Console.Write("> ");

        string text;
        try
        {
            text = Console.ReadLine();
        }
        catch (IOException)
        {
            // IO exception is thrown when ConsoleCancelEventArgs.Cancel == true.
            break;
        }
        if (text == null)
        {
            // Console returned null before
            // the CancelKeyPress was treated
            break;
        }

        string key = null;
        string val = text;

        // split line if both key and value specified.
        int index = text.IndexOf(" ");
        if (index != -1)
        {
            key = text.Substring(0, index);
            val = text.Substring(index + 1);
        }

        try
        {
            // Note: Awaiting the asynchronous produce request below prevents flow of execution
            // from proceeding until the acknowledgement from the broker is received (at the
            // expense of low throughput).
            var deliveryReport = await producer.ProduceAsync(
                topicName, new Message<string, string> { Key = key, Value = val });

            Console.WriteLine($"delivered to: {deliveryReport.TopicPartitionOffset}");
        }
        catch (ProduceException<string, string> e)
        {
            Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
        }
    }

    // Since we are producing synchronously, at this point there will be no messages
    // in-flight and no delivery reports waiting to be acknowledged, so there is no
    // need to call producer.Flush before disposing the producer.
}
  1. 消费者代码
Program.cs
// See https://aka.ms/new-console-template for more information
//Console.WriteLine("Hello, World!");

using Confluent.Kafka;

/// <summary>
///     In this example
///         - offsets are automatically committed.
///         - no extra thread is created for the Poll (Consume) loop.
/// </summary>
void Run_Consume( CancellationToken cancellationToken)
{
    var topics = "T1";

    var config = new ConsumerConfig
    {
        BootstrapServers = "192.168.20.31:9092",
        GroupId = "G1",
        EnableAutoCommit = false,//设置非自动偏移,业务逻辑完成后手动处理偏移,防止数据丢失
        AutoOffsetReset = AutoOffsetReset.Earliest,
        SaslMechanism = SaslMechanism.ScramSha256,
        SecurityProtocol = SecurityProtocol.SaslPlaintext,
        ReconnectBackoffMaxMs = 60000,

        //EnableAutoOffsetStore = false,
        //StatisticsIntervalMs = 5000,
        //SessionTimeoutMs = 6000,
        //EnablePartitionEof = true,
        //// A good introduction to the CooperativeSticky assignor and incremental rebalancing:
        //// https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/
        //PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky,

        SaslUsername = "quber1",
        SaslPassword = "123456",

        //SslCaLocation = AppDomain.CurrentDomain.BaseDirectory + SslCaLocation
    };

    // Note: If a key or value deserializer is not set (as is the case below), the
    // deserializer corresponding to the appropriate type from Confluent.Kafka.Deserializers
    // will be used automatically (where available). The default deserializer for string
    // is UTF8. The default deserializer for Ignore returns null for all input data
    // (including non-null data).
    using (var consumer = new ConsumerBuilder<Ignore, string>(config)
        // Note: All handlers are called on the main .Consume thread.
        .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
        .SetStatisticsHandler((_, json) => Console.WriteLine($"Statistics: {json}"))
        .SetPartitionsAssignedHandler((c, partitions) =>
        {
            // Since a cooperative assignor (CooperativeSticky) has been configured, the
            // partition assignment is incremental (adds partitions to any existing assignment).
            Console.WriteLine(
                "Partitions incrementally assigned: [" +
                string.Join(',', partitions.Select(p => p.Partition.Value)) +
                "], all: [" +
                string.Join(',', c.Assignment.Concat(partitions).Select(p => p.Partition.Value)) +
                "]");

            // Possibly manually specify start offsets by returning a list of topic/partition/offsets
            // to assign to, e.g.:
            // return partitions.Select(tp => new TopicPartitionOffset(tp, externalOffsets[tp]));
        })
        .SetPartitionsRevokedHandler((c, partitions) =>
        {
            // Since a cooperative assignor (CooperativeSticky) has been configured, the revoked
            // assignment is incremental (may remove only some partitions of the current assignment).
            var remaining = c.Assignment.Where(atp => partitions.Where(rtp => rtp.TopicPartition == atp).Count() == 0);
            Console.WriteLine(
                "Partitions incrementally revoked: [" +
                string.Join(',', partitions.Select(p => p.Partition.Value)) +
                "], remaining: [" +
                string.Join(',', remaining.Select(p => p.Partition.Value)) +
                "]");
        })
        .SetPartitionsLostHandler((c, partitions) =>
        {
            // The lost partitions handler is called when the consumer detects that it has lost ownership
            // of its assignment (fallen out of the group).
            Console.WriteLine($"Partitions were lost: [{string.Join(", ", partitions)}]");
        })
        .Build())
    {
        consumer.Subscribe(topics);

        try
        {
            while (true)
            {
                try
                {
                    var consumeResult = consumer.Consume(cancellationToken);

                    if (consumeResult.IsPartitionEOF)
                    {
                        Console.WriteLine(
                            $"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");

                        continue;
                    }

                    Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");
                    try
                    {
                        // Store the offset associated with consumeResult to a local cache. Stored offsets are committed to Kafka by a background thread every AutoCommitIntervalMs.
                        // The offset stored is actually the offset of the consumeResult + 1 since by convention, committed offsets specify the next message to consume.
                        // If EnableAutoOffsetStore had been set to the default value true, the .NET client would automatically store offsets immediately prior to delivering messages to the application.
                        // Explicitly storing offsets after processing gives at-least once semantics, the default behavior does not.
                        consumer.StoreOffset(consumeResult);
                    }
                    catch (KafkaException e)
                    {
                        Console.WriteLine($"Store Offset error: {e.Error.Reason}");
                    }
                }
                catch (ConsumeException e)
                {
                    Console.WriteLine($"Consume error: {e.Error.Reason}");
                }
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Closing consumer.");
            consumer.Close();
        }
    }
}


CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
    e.Cancel = true; // prevent the process from terminating.
    cts.Cancel();
};

Run_Consume( cts.Token);

8.3、最终效果

我们在生产者窗口输入内容,即可在消费者窗口看到消费的消息,具体如下图所示:

最终效果
最终效果

9、🍉 动态增加用户

3.4、创建 SCRAM 凭证中,我们提到了使用SCRAM方式来实现安全认证,因此,我们可以在不重启Kafka服务的情况下动态增加相应的用户

如下所示命令就是相关的动态增加用户以及设置用户读写权限:

# 增加用户quber2
.\bin\windows\kafka-configs.bat --zookeeper 192.168.20.31:2181 --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name quber2

# 增加用户quber3
.\bin\windows\kafka-configs.bat --zookeeper 192.168.20.31:2181 --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name quber3

# 为用户quber2设置写权限(主题为T1)
.\bin\windows\kafka-acls.bat --bootstrap-server 192.168.20.31:9092 --add --allow-principal User:quber2 --operation Write --topic T1 --command-config .\config\adminclient-configs.conf

# 为用户quber3设置读权限(主题为T1,分组为G1)
.\bin\windows\kafka-acls.bat --bootstrap-server 192.168.20.31:9092 --add --allow-principal User:quber3 --operation Read --topic T1 --group G1 --command-config .\config\adminclient-configs.conf

Kafka 安全认证方式

验证方式Kafka 版本特点
SASL/PLAIN0.10.0.0不能动态增加用户
SASL/SCRAM-SHA-2560.10.2.0可以动态增加用户
SASL/Kerberos0.9.0.0需要独立部署验证服务
SASL/OAUTHBEARER2.0.0需自己实现接口实现 token 的创建和验证,需要额外 Oauth 服务

10、🥦 常用命令

10.1、SCRAM 凭证命令

# 创建管理员用户:admin
.\bin\windows\kafka-configs.bat --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name admin

# 创建读和写权限用户:quber
.\bin\windows\kafka-configs.bat --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name quber

# 查看所有用户凭证
.\bin\windows\kafka-configs.bat --zookeeper localhost:2181 --describe --entity-type users

# 查看单个用户凭证
.\bin\windows\kafka-configs.bat --zookeeper localhost:2181 --describe --entity-type users --entity-name admin

# 删除某个用户凭证
.\bin\windows\kafka-configs.bat --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name quber

10.2、主题命令

# 查看所有主题
.\bin\windows\kafka-topics.bat --bootstrap-server 192.168.20.31:9092 --list --command-config .\config\adminclient-configs.conf

# 创建主题
.\bin\windows\kafka-topics.bat --bootstrap-server 192.168.20.31:9092 --create --replication-factor 1 --partitions 3 --topic T1 --command-config .\config\adminclient-configs.conf

# 查看某个主题
.\bin\windows\kafka-topics.bat --bootstrap-server 192.168.20.31:9092 --describe --topic T1 --command-config .\config\adminclient-configs.conf

# 修改某个主题
.\bin\windows\kafka-topics.bat --bootstrap-server 192.168.20.31:9092 --alter --topic T1 --partitions 4 --command-config .\config\adminclient-configs.conf

# 删除某个主题
.\bin\windows\kafka-topics.bat --bootstrap-server 192.168.20.31:9092 --delete --topic T2 --command-config .\config\adminclient-configs.conf

删除某个主题有问题,会导致 Kafka 服务启动不了,请谨慎操作

删除某个主题命令如下所示:

.\bin\windows\kafka-topics.bat --bootstrap-server 192.168.20.31:9092 --delete --topic T2 --command-config .\config\adminclient-configs.conf

10.3、权限命令

# 给用户quber赋予读和写权限(主题为T1)
.\bin\windows\kafka-acls.bat --bootstrap-server 192.168.20.31:9092 --add --allow-principal User:quber --operation Read --operation Write --topic T1 --command-config .\config\adminclient-configs.conf

# 给用户quber赋予读权限所属的分组(分组为G1)
.\bin\windows\kafka-acls.bat --bootstrap-server 192.168.20.31:9092 --add --allow-principal User:quber --operation Read --group G1 --command-config .\config\adminclient-configs.conf

# 给用户quber1赋予读权限(主题为T1,分组为G1)
.\bin\windows\kafka-acls.bat --bootstrap-server 192.168.20.31:9092 --add --allow-principal User:quber1 --operation Read --topic T1 --group G1 --command-config .\config\adminclient-configs.conf

# 查看用户权限
.\bin\windows\kafka-acls.bat --authorizer-properties zookeeper.connect=192.168.20.31:2181 --list

10.4、其他命令

# 查看Kafka版本
.\bin\windows\kafka-configs.bat --describe --bootstrap-server 127.0.0.1:9092 --version

11、Kafka-Manager

11.1、简介

为了简化开发者和服务工程师维护 Kafka 集群的工作,yahoo 构建了一个叫做 Kafka 管理器的基于 Web 工具,叫做Kafka Manager

这个管理工具可以很容易地发现分布在集群中的哪些 Topic 分布不均匀,或者是分区在整个集群分布不均匀的的情况。

它支持管理多个集群、选择副本、副本重新分配以及创建 Topic。同时,这个管理工具也是一个非常好的可以快速浏览这个集群的工具,有如下功能:

  1. 管理多个 kafka 集群;

  2. 便捷的检查 kafka 集群状态(topics、brokers、备份分布情况、分区分布情况);

  3. 选择你要运行的副本;

  4. 基于当前分区状况进行;

  5. 可以选择 Topic 配置并创建 topic(0.8.1.1 和 0.8.2 的配置不同);

  6. 删除 opic(只支持 0.8.2 以上的版本并且要在 broker 配置中设置 delete.topic.enable=true);

  7. Topic list 会指明哪些 Topic 被删除(在 0.8.2 以上版本适用);

  8. 为已存在的 Topic 增加分区;

  9. 为已存在的 Topic 更新配置;

  10. 在多个 Topic 上批量重分区;

  11. 在多个 Topic 上批量重分区(可选 partition broker 位置)。

11.2、安装

官网下载地址:https://github.com/yahoo/CMAK/releasesopen in new window

此处我们下载:https://github.com/yahoo/CMAK/releases/download/3.0.0.6/cmak-3.0.0.6.zipopen in new window

  1. 解压安装

    我们将下载下来的cmak-3.0.0.6.zip文件解压到D:\Net_Program\Net_KafkaManager目录。

  2. 配置

    我们在 conf 文件夹中打开application.conf文件,然后修改kafka-manager.zkhosts的值,如下所示:

    # https://github.com/yahoo/CMAK/issues/713
    kafka-manager.zkhosts="192.168.3.200:2181"
    kafka-manager.zkhosts=${?ZK_HOSTS}
    

上述安装配置的各个文件已经打包好,参见:3.5.1-各个文件的配置

可以直接覆盖,但是覆盖后需要修改其中的一些配置,比如用户名称和密码、IP 地址等!!!