admin管理员组文章数量:1576349
一 Kafka介绍
1)kafka定义
Kafka 被称为下一代分布式消息系统,由 Scala 和 Java编写,是非营利性组织ASF(Apache Software Foundation)基金会中的一个开源项目,比如:HTTP Server、Tomcat、Hadoop、ActiveMQ等开源软件都属于 Apache基金会的开源软件,类似的消息系统还有RabbitMQ、ActiveMQ、ZeroMQ。
Kafka用于构建实时数据管道和流应用程序。 它具有水平可伸缩性,容错性,快速性,可在数千家组织中同时投入生产协同工作。
2)传统消息队列应用场景
- 削峰填谷
诸如电商业务中的秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列可提供削峰填谷的服务来解决该问题。
- 异步解耦
交易系统作为淘宝等电商的最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等等,整体业务系统庞大而且复杂,消息队列可实现异步通信和应用解耦,确保主站业务的连续性。
- 顺序收发
细数日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出FIFO(First In First Out)原理类似,消息队列提供的顺序消息即保证消息FIFO。
- 分布式事务一致性
交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。
- 分布式缓存同步
电商的大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因带宽瓶颈,限制了商品变更的访问流量,通过消息队列构建分布式缓存,实时通知商品数据的变化。
3)kafka特点和优势
特点:
- 分布式: 多机实现,不允许单机
- 分区: 一个消息.可以拆分出多个,分别存储在多个位置
- 多副本: 防止信息丢失,可以多来几个备份
- 多订阅者: 可以有很多应用连接kafka
- Zookeeper: 早期版本的Kafka依赖于zookeeper, 2021年4月19日Kafka 2.8.0正式发布,此版本包括了很多重要改动,最主要的是kafka通过自我管理的仲裁来替代ZooKeeper,即Kafka将不再需要ZooKeeper!!!
优势:
- 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。支持通过Kafka 服务器分区消息。
- 分布式: Kafka 基于分布式集群实现高可用的容错机制,可以实现自动的故障转移。
- 顺序保证:在大多数使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。 Kafka保证一个Partiton内的消息的有序性(分区间数据是无序的,如果对数据的顺序有要求,应将在创建主题时将分区数partitions设置为1)。
- 支持 Hadoop 并行数据加载。
- 通常用于大数据场合,传递单条消息比较大,而Rabbitmq 消息主要是传输业务的指令数据,单条数据较小。
4)kafka角色介绍
(1)Producer:Producer即生产者,消息的产生者,是消息的入口。负责发布消息到Kafka broker
(2)Consumer:消费者,用于消费消息,即处理消息
Broker:Broker是kafka实例,每个服务器上可以有一个或多个kafka的实例,假设每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如: broker-0、broker-1等……
(3)Topic :消息的主题,可以理解为消息的分类,一个Topic相当于数据库中的一张表,一条消息相当于关系数据库的一条记录,一个Topic或者相当于Redis中列表类型的一个Key,一条消息即为列表中的一个元素。kafka的数据就保存在topic。在每个broker上都可以创建多个topic。物理上不同 topic 的消息分开存储在不同的文件夹,逻辑上一个 topic的消息虽然保存于一个或多个broker 上, 但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处,topic 在逻辑上对record(记录、日志)进行分组保存,消费者需要订阅相应的topic 才能消费topic中的消息
(4)Consumer group: 每个consumer 属于一个特定的consumer group(可为每个consumer 指定 group name,若不指定 group name 则属于默认的group),同一topic的一条消息只能被同一个consumer group 内的一个consumer 消费,类似于一对一的单播机制,但多个consumer group 可同时消费这一消息,类似于一对多的多播机制
(5)Partition :是物理上的概念,每个topic 分割为一个或多个partition,即一个topic切分为多份.创建 topic时可指定 partition 数量,partition的表现形式就是一个一个的文件夹,该文件夹下存储该partition的数据和索引文件,分区的作用还可以实现负载均衡,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,一般Partition数不要超过节点数,注意同一个partition数据是有顺序的,但不同的partition则是无序的
(6)Replication: 同样数据的副本,包括leader和follower的副本数,基本于数据安全,建议至少2个,是Kafka的高可靠性的保障,和ES的副本有所不同,Kafka中的副本数包括主分片数,而ES中的副本数不包括主分片数
为了实现数据的高可用,比如将分区 0 的数据分散到不同的kafka 节点,每一个分区都有一个 broker 作为 Leader 和一个 broker 作为Follower,类似于ES中的主分片和副本分片。
假设分区为 3, 即分三个分区0-2,副本为3,即每个分区都有一个 leader,再加两个follower,分区 0 的leader为服务器A,则服务器 B 和服务器 C 为 A 的follower,而分区 1 的leader为服务器B,则服务器 A 和C 为服务器B 的follower,而分区 2 的leader 为C,则服务器A 和 B 为C 的follower。
AR: Assigned Replicas,分区中的所有副本的统称,包括leader和 follower,AR= lSR+ OSR
lSR:ln Sync Replicas,所有与leader副本保持同步的副本 follower和leader本身组成的集合,包括leader和 follower,是AR的子集
OSR:out-of-Sync Replied,所有与leader副本同步不能同步的 follower的集合,是AR的子集
5)分区和副本的优势
实现存储空间的横向扩容,即将多个kafka服务器的空间组合利用
提升性能,多服务器并行读写
实现高可用,每个分区都有一个主分区即 leader 分布在不同的kafka 服务器,并且有对应follower 分布在和leader不同的服务器上
6)kafka 写入消息的流程
- 生产者(producter)先从kafka集群获取分区的leader
- 生产者(producter)将消息发送给leader
- leader将消息写入本地文件
- followers从leader pull消息
- followers将消息写入本地后向leader发送ACK
- leader收到所有副本的ACK后向producter发送ACK
二 Kafka部署
2.1 单机部署
我这里直接采用集群方式部署,如果需要单机部署,可以参考官网文档部署。
https://kafka.apache/quickstart
2.2 集群部署
1)集群规划
主机名 | IP | 应用 | OS |
---|---|---|---|
node01 | 11.0.1.131 | zookeeper、kafka v_3.6.1 | Euler 21.10 LTS |
node02 | 11.0.1.136 | zookeeper、kafka v_3.6.1 | Euler 21.10 LTS |
node03 | 11.0.1.137 | zookeeper、kafka v_3.6.1 | Euler 21.10 LTS |
2)环境准备 ZooKeeper
zookeeper集群我这里已经提前准备好了,部署过程就省略了,有需要可以去参考 zookeeper 那篇文章。
[root@node01 ~]# /usr/local/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
[root@zk-02 ~]# /usr/local/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader
[root@zk-03 ~]# /usr/local/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
3)kafka 集群部署
我这里kafka集群和zookeeper集群复用三台服务器,如有其他需求,可以分开部署,注意都是需要安装java环境的。
【1】下载安装包,是编译好了,解开就能用
# 尽量下载scala版本为2.13的
https://archive.apache/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz
# kafka版本格式
kafka_<scala 版本>_<kafka 版本>
# 示例:kafka_2.13-2.7.0.tgz
【2】上传至各个节点
# 创建存放目录(个人习惯)
[root@node01 ~]# mkdir -p /home/weihu/src
[root@node02 ~]# mkdir -p /home/weihu/src
[root@node03 ~]# mkdir -p /home/weihu/src
[root@node01 ~]# ll /home/weihu/src/
总用量 109M
-rw-r--r-- 1 root root 109M 6月 16 14:51 kafka_2.13-3.6.1.tgz
[root@node02 ~]# ll /home/weihu/src/
总用量 109M
-rw-r--r-- 1 root root 109M 6月 16 14:51 kafka_2.13-3.6.1.tgz
[root@node03 ~]# ll /home/weihu/src/
总用量 109M
-rw-r--r-- 1 root root 109M 6月 16 14:45 kafka_2.13-3.6.1.tgz
【3】解压缩
[root@node01 src]# tar -xf kafka_2.13-3.6.1.tgz -C /usr/local/
[root@node01 src]# ll /usr/local/
总用量 48K
drwxr-xr-x. 2 root root 4.0K 6月 24 2021 bin
drwxr-xr-x. 2 root root 4.0K 6月 24 2021 etc
drwxr-xr-x. 2 root root 4.0K 6月 24 2021 games
drwxr-xr-x. 2 root root 4.0K 6月 24 2021 include
drwxr-xr-x 7 root root 4.0K 11月 24 2023 kafka_2.13-3.6.1
drwxr-xr-x. 2 root root 4.0K 6月 24 2021 lib
drwxr-xr-x. 3 root root 4.0K 1月 23 18:51 lib64
drwxr-xr-x. 2 root root 4.0K 6月 24 2021 libexec
drwxr-xr-x. 2 root root 4.0K 6月 24 2021 sbin
drwxr-xr-x. 5 root root 4.0K 1月 23 18:51 share
drwxr-xr-x. 2 root root 4.0K 6月 24 2021 src
drwxr-xr-x 8 root root 4.0K 6月 14 21:48 zookeeper
[root@node02 ~]# cd /home/weihu/src/
[root@node02 src]# tar -xf kafka_2.13-3.6.1.tgz -C /usr/local/
[root@node03 ~]# cd /home/weihu/src/
[root@node03 src]# tar -xf kafka_2.13-3.6.1.tgz -C /usr/local/
【4】改名或者创建软连接
[root@node01 src]# cd /usr/local/
[root@node01 local]# mv kafka_2.13-3.6.1 kafka
[root@node01 local]# ll
总用量 48K
drwxr-xr-x. 2 root root 4.0K 6月 24 2021 bin
drwxr-xr-x. 2 root root 4.0K 6月 24 2021 etc
drwxr-xr-x. 2 root root 4.0K 6月 24 2021 games
drwxr-xr-x. 2 root root 4.0K 6月 24 2021 include
drwxr-xr-x 7 root root 4.0K 11月 24 2023 kafka
drwxr-xr-x. 2 root root 4.0K 6月 24 2021 lib
drwxr-xr-x. 3 root root 4.0K 1月 23 18:51 lib64
drwxr-xr-x. 2 root root 4.0K 6月 24 2021 libexec
drwxr-xr-x. 2 root root 4.0K 6月 24 2021 sbin
drwxr-xr-x. 5 root root 4.0K 1月 23 18:51 share
drwxr-xr-x. 2 root root 4.0K 6月 24 2021 src
drwxr-xr-x 8 root root 4.0K 6月 14 21:48 zookeeper
[root@node02 src]# cd /usr/local/
[root@node02 local]# mv kafka_2.13-3.6.1 kafka
[root@node03 src]# cd /usr/local/
[root@node03 local]# mv kafka_2.13-3.6.1 kafka
# 或者创建软连接
ln -s /usr/local/kafka_2.13-3.6.1 /usr/local/kafka
【5】配置环境变量(可选)
[root@node01 local]# echo 'PATH=/usr/local/kafka/bin:$PATH' > /etc/profile.d/kafka.sh
[root@node01 local]# . /etc/profile.d/kafka.sh
[root@node02 local]# echo 'PATH=/usr/local/kafka/bin:$PATH' > /etc/profile.d/kafka.sh
[root@node02 local]# . /etc/profile.d/kafka.sh
[root@node03 local]# echo 'PATH=/usr/local/kafka/bin:$PATH' > /etc/profile.d/kafka.sh
[root@node03 local]# . /etc/profile.d/kafka.sh
【6】kafka目录查看
[root@node01 local]# ll /usr/local/kafka/
总用量 72K
drwxr-xr-x 3 root root 4.0K 11月 24 2023 bin
drwxr-xr-x 3 root root 4.0K 11月 24 2023 config
drwxr-xr-x 2 root root 12K 6月 16 14:55 libs
-rw-r--r-- 1 root root 15K 11月 24 2023 LICENSE
drwxr-xr-x 2 root root 4.0K 11月 24 2023 licenses
-rw-r--r-- 1 root root 28K 11月 24 2023 NOTICE
drwxr-xr-x 2 root root 4.0K 11月 24 2023 site-docs
【7】配置文件详解
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
# See kafka.server.KafkaConfig for additional details and defaults
#
############################# Server Basics #############################
# broker 的全局唯一编号,不能重复,只能是数字。
broker.id=0
############################# Socket Server Settings #############################
# 套接字服务器侦听的地址。如果未配置,主机名将等于的值
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
# 侦听器名称、主机名和代理将向客户端公布的端口。
# 如果未设置,则使用“listeners”的值。
#advertised.listeners=PLAINTEXT://your.host.name:9092
# 将侦听器名称映射到安全协议,默认情况下它们是相同的。有关更多详细信息,请参阅配置文档
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# 处理网络请求的线程数量(服务器用于从网络接收请求并向网络发送响应的线程数)
num.network.threads=3
# 用来处理磁盘 IO 的线程数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区最大大小
socket.request.max.bytes=104857600
############################# Log Basics #############################
# kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/usr/kafka/kafka_2.13-3.6.1/datas
# 每个topic在当前 broker上的默认分区数。更多的分区允许更大的并行性以供使用,但这也会导致代理之间有更多的文件。
num.partitions=1
# 启动时用于日志恢复和关闭时用于刷新的每个数据目录的线程数。(用来恢复和清理 data 下数据的线程数量)对于数据目录位于RAID阵列中的安装,建议增加此值。
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# 每个 topic 创建时的副本数,默认时 1 个副本,对于开发测试以外的环境,建议使用大于1的值以确保可用性,如3
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# 强制将数据刷新到磁盘之前要接受的消息数
#log.flush.interval.messages=10000
# 在强制刷新之前,消息可以在日志中停留的最长时间
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
# 基于大小的日志保留策略。除非剩余的
# segments下降到 log.retention.bytes 以下。独立于log.retention.hours的函数.
#log.retention.bytes=1073741824
# 每个 segment 文件的最大大小,默认最大 1G ,当达到此大小时,将创建一个新的segment。
log.segment.bytes=1073741824
# 检查日志段以查看是否可以根据保留策略删除它们的间隔(检查过期数据的时间,默认 5 分钟检查一次是否数据过期)
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper集群连接字符串,一个以逗号分隔的'主机:端口'对,每个对对应一个zk服务器。可以在url中附加一个可选的chroot字符串,以指定所有kafka-znode的根目录。
zookeeper.connect=11.0.1.131:2181,11.0.1.136:2181,11.0.1.137:2181
# 连接到zookeeper 的超时时间(毫秒)
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
# 以下配置指定GroupCoordinator将延迟初始使用者重新平衡的时间(以毫秒为单位)。
# 随着新成员加入组,再平衡将进一步延迟group.initial.rebalance.delay.ms的值,最大值为max.poll.interval.ms。
# 默认值为3秒。
# 我们在这里将其覆盖为0,因为它为开发和测试提供了更好的开箱即用体验。
# 但是,在生产环境中,默认值3秒更合适,因为这将有助于避免在应用程序启动期间进行不必要的、可能代价高昂的重新平衡。
group.initial.rebalance.delay.ms=3
【8】修改配置文件
重要!!!
broker.id 一定要修改,每个broker 在集群中的唯一标识,正整数。
listeners 指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
# node01节点
[root@node01 config]# egrep -v "^#|^$" /usr/local/kafka/config/server.properties
broker.id=1
listeners=PLAINTEXT://11.0.1.131:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/data
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=11.0.1.131:2181,11.0.1.136:2181,11.0.1.137:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
# node02节点
[root@node02 local]# egrep -v "^#|^$" /usr/local/kafka/config/server.properties
broker.id=2
listeners=PLAINTEXT://11.0.1.136:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/data
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=11.0.1.131:2181,11.0.1.136:2181,11.0.1.137:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
# node03节点
[root@node03 local]# egrep -v "^#|^$" /usr/local/kafka/config/server.properties
broker.id=3
listeners=PLAINTEXT://11.0.1.137:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/data
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=11.0.1.131:2181,11.0.1.136:2181,11.0.1.137:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
【9】可以调整内存,根据业务需求
vim /usr/local/kafka/bin/kafka-server-start.sh
....
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
....
【9】创建数据目录
mkdir /usr/local/kafka/data
【9】启动kafka集群
[root@node01 ~]# kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
[root@node02 ~]# kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
[root@node03 ~]# kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
【10】查看服务状态
[root@node01 ~]# ss -ntl |grep 9092
LISTEN 0 50 [::ffff:11.0.1.131]:9092 *:*
[root@node02 ~]# ss -ntl |grep 9092
LISTEN 0 50 [::ffff:11.0.1.131]:9092 *:*
[root@node03 ~]# ss -ntl |grep 9092
LISTEN 0 50 [::ffff:11.0.1.131]:9092 *:*
三 集群可用性验证
1)客户端连接zookeeper集群,查看kafka的元数据是否已经写入
[root@node03 data]# /usr/local/zookeeper/bin/zkCli.sh -server 11.0.1.131:2181
....
[zk: 11.0.1.131:2181(CONNECTED) 12] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
2)kafka读写数据
kafka-topics.sh #消息的管理命令
kafka-console-producer.sh #生产者的模拟命令
kafka-console-consumer.sh #消费者的模拟命令
【1】获取所有Topic
[root@node01 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 11.0.1.136:9092
# --bootstrap-server 指向哪个kafka节点都可以
【2】创建Topic
[root@node01 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --topic test01 --bootstrap-server 11.0.1.136:9092 --partitions 3 --replication-factor 2
Created topic test01.
# --topic 主题名称
# --partitions 分区数
# --replication-factor 副本数
# 查看
[root@node01 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 11.0.1.136:9092 test01
[root@node01 ~]#
【3】查看Topic详细
[root@node01 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --bootstrap-server 11.0.1.136:9092 --topic test01
Topic: test01 TopicId: dfdpGX8FQJuXVQcLclmzaA PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: test01 Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: test01 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test01 Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
# 状态说明:test01 有三个分区分别为0、1、2,分区0的leader是3 (broker.id),分区 0 有2 个副本,并且状态都为 Isr(ln-sync,表示可以参加选举成为 leader)。
# Replicas代表第几个服务器,在第一台服务器中,有0分区的分片和1分区的分片
# 查看数据目录也可以看出来
# test01是topic的名称,0和1就是副本,只不过看不出来是leader还是followers
[root@node01 ~]# ls /usr/local/kafka/data/
cleaner-offset-checkpoint meta.properties replication-offset-checkpoint test01-1
log-start-offset-checkpoint recovery-point-offset-checkpoint test01-0
[root@node02 kafka]# ls /usr/local/kafka/data/
cleaner-offset-checkpoint meta.properties replication-offset-checkpoint test01-2
log-start-offset-checkpoint recovery-point-offset-checkpoint test01-1
[root@node03 data]# ls /usr/local/kafka/data/
cleaner-offset-checkpoint meta.properties replication-offset-checkpoint test01-2
log-start-offset-checkpoint recovery-point-offset-checkpoint test01-0
【4】生产 Topic(生产数据)
# 发送消息命令格式:
kafka-console-producer.sh --broker-list <kafkaIP1>:<端口>,<kafkaIP2>:<端口> --topic <topic名称>
# 产生数据,交互式输入消息,按Ctrl+C退出
[root@node01 ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 11.0.1.131:9092 --topic test01
>hi01
>hello02
>wher
>
【5】消费Topic(消费数据)
注意点!!!
-
消息者先生产消息,消费者后续才启动,并且也能收到之前生产的消息
-
同一个消息在同一个group内的消费者只有被一个消费者消费,例如:共500条消息,在一个group内有A1,A2两个消费者,其中A1消费200条,A2消费另外的300条消息。从而实现负载均衡,不同group内的消费者则可以同时消费同一个消息
-
–from-beginning 表示消费前发布的消息也能收到,默认只能收到消费后发布的新消息
# 接收消息命令格式:
kafka-console-consumer.sh --bootstrap-server <host>:<post> --topic <topic名称> --from-beginning --consumer-property group.id=<组名称>
# 生产数据
[root@node01 ~]#
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 11.0.1.131:9092 --topic test01
>hello
>aaa
>bbb
>
# 消费数据(只能收到消费发布后的新消息)
[root@node02 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --topic test01 --bootstrap-server 11.0.1.136:9092
hello
aaa
bbb
# 消费数据(费前发布的消息也能收到)
[root@node02 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --topic test01 --bootstrap-server 11.0.1.136:9092 --from-beginning
hi01
hello02
wher
hello
aaa
bbb
扩展:生产数据和消费数据连接Kafka集群的任意节点都可以
【6】删除Topic
[root@node01 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --topic test02 --bootstrap-server 11.0.1.136:9092 --partitions 3 --replication-factor 2
Created topic test02.
[root@node01 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 11.0.1.136:9092
__consumer_offsets
test01
test02
[root@node01 ~]# /usr/local/kafka/bin/kafka-topics.sh --delete --bootstrap-server 11.0.1.136:9092 --topic test02
[root@node01 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 11.0.1.136:9092
__consumer_offsets
test01
[root@node01 ~]#
四 Kafka 在 ZooKeeper 里面的存储结构
# topic 结构
/brokers/topics/[topic]
[zk: 11.0.1.131:2181(CONNECTED) 0] ls /brokers/topics
[__consumer_offsets, test01]
[zk: 11.0.1.131:2181(CONNECTED) 1] ls /brokers/topics/test01
[partitions]
[zk: 11.0.1.131:2181(CONNECTED) 2] ls /brokers/topics/test01/partitions
[0, 1, 2]
# partition结构
/brokers/topics/[topic]/partitions/[partitionId]/state
# broker信息
/brokers/ids/[o...N]
五 kafka图形化工具
当需要读取kafka中的数据时,在服务器上查看比较麻烦,有时候数据量较大而且不是很直观。此时就需要一款简洁,使用方便的可视化工具了 — Offset Explorer
什么是 Offset Explorer?
Offset Explorer (之前称 Kafka Tool)是一款kafka的可视化工具,可以,查看kafka的topic ,partion数量,以及查看写入到kafa中的数据,整体页面非常简洁,使用起来也比较容易,他支持 mac ,windows,linux 服务器。
1)下载地址
我这里直接下载windows版
https://www.kafkatool/download.html
2)双击进行安装
3)安装完成后会让其添加kafka集群
4)输入kafka集群信息
1 集群名称,自定义
2 kafka集群地址(格式为host1:port1,host2:port2)
3 kafka版本
5)可以先测试能不能ping通
测试通以后直接连接即可
6)连接成功
7)查看创建的Topic、分区信息等
8)需要将数据格式改为文本,否则看到的数据是转成16进制的
9)查看数据
版权声明:本文标题:Kafka集群部署详解 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/xitong/1727799872a1130694.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论