前言
Kafka是一种高吞吐量的分布式发布订阅消息系统,以下记录Kafka集群的安装验证过程。
环境准备&安装文件
安装文件下载
以下为大家准备了安装文件,也可以自己到网上下载
下载地址:http://kafka.apache.org/downloads
| 准备文件 | 下载地址 |
| ————————— | —————————————————————————————— |
| kafka_2.11-0.8.2.2 | 链接: https://pan.baidu.com/s/1dlUjcLjemTcm7jnc0HDiwQ 提取码: typ4 |服务器环境安装(Master201,Slave202,Slave203一共3台服务器)
Kafka安装配置
上传安装包解压
1
2上传安装包至服务器
[root@master201 Soft]# tar -xvf kafka_2.11-0.8.2.2修改server.properties配置文件
1
2
3
4
5
6
7
8
9进入kafka配置目录
[root@master201 Soft]# cd /home/lishijia/Soft/kafka_2.11-0.8.2.2/config
[root@master201 config]# vim server.properties
broker.id与zookeeper myid 类似,必须唯一。当前kafka应用程序得唯一标识
broker.id=201
此处在末尾记住加上kafka,要不然针对kafka在zookeeper上创建的一些目录全部都是在根目录下
zookeeper.connect=master201:2181,slave202:2181,slave203:2181/kafkascp安装到其他两台机器&修改broker.id
1
2
3
4
5
6
7
8[root@master201 Soft]# scp -r kafka_2.11-0.8.2.2/ slave202:/home/lishijia/Soft
[root@master201 Soft]# scp -r kafka_2.11-0.8.2.2/ slave203:/home/lishijia/Soft
其他两台机器修改对应的broker.id
[root@master201 config]# vim server.properties
分别修改为对应机器ip的尾数
broker.id=202
broker.id=203kafka集群启动
1
2
3
4
5启动kafka之前确保独立的Zookeeper集群已经启动
3台机器都需要执行以下命令
[root@master201 bin]# ./kafka-server-start.sh ../config/server.properties
[root@slave202 bin]# ./kafka-server-start.sh ../config/server.properties
[root@slave203 bin]# ./kafka-server-start.sh ../config/server.properties通过Zookeeper检查是否启动
1
2
3
4
5
6
7
8
9
10
11
12通过zkCli连接上Zookeeper查看kafka目录状态,通过以下命令可以查看到对应的201,202,203节点已经存在于zookeeper当中
如果把当中的一台broker停掉,那么以下ids目录下的节点则会对应的消失
此处zookeepr使用的是临时节点方式监控对应的主机是否连接上,即如果broker对应的zookeeper连接断开,则当前节点会自动删除通过这种方式监控所存活的broker
[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper, kafka]
[zk: localhost:2181(CONNECTED) 1] ls /kafka
[admin, consumers, controller, controller_epoch, brokers, config]
[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers
[ids, topics]
[zk: localhost:2181(CONNECTED) 3] ls /kafka/brokers/ids
[201, 202, 203]
[zk: localhost:2181(CONNECTED) 4]
创建Topic
master201启动
1
2
3
4
5
6我们创建一个名称为lishijiaTop的Topic,5个分区,并且复制因子为3,执行如下命令:
[root@master201 bin]# ./kafka-topics.sh --create --zookeeper master201:2181,slave202:2181,slave203:2181/kafka --replication-factor 3 --partitions 5 --topic lishijiaTop
查看创建的Topic,执行如下命令:
[root@master201 bin]# ./kafka-topics.sh --list --zookeeper master201:2181,slave202:2181,slave203:2181/kafka
[root@master201 bin]# ./kafka-topics.sh --describe --zookeeper master201:2181,slave202:2181,slave203:2181/kafka --topic lishijiaTop可以通过如下图看到对应的分区信息,包括存储因子所在的broker
1
2
3
4
5
6
7
8
9
10zookeeper所存储的信息如下
[zk: localhost:2181(CONNECTED) 5] ls /kafka/brokers
[ids, topics]
[zk: localhost:2181(CONNECTED) 6] ls /kafka/brokers/topics
[lishijiaTop]
[zk: localhost:2181(CONNECTED) 7] ls /kafka/brokers/topics/lishijiaTop
[partitions]
[zk: localhost:2181(CONNECTED) 8] ls /kafka/brokers/topics/lishijiaTop/partitions
[0, 1, 2, 3, 4]
[zk: localhost:2181(CONNECTED) 9]
创建生产端
通过Kafka自带的kafka-console-producer.sh脚本,来验证演示发消息。
1
2在一个终端,启动Producer,并向我们上面创建的名称为lishijiaTop的Topic中生产消息,执行如下脚本
[root@master201 bin]# ./kafka-console-producer.sh --broker-list master201:9092,master202:9092,master203:9092 --topic lishijiaTop发送消息
可以在Producer终端上输入字符串消息行,然后回车。相当于就把消息发送到kafka当中,并且会根据配置的策略会存储到磁盘当中。
如下刚才把集群的地址写错了,即报如下错误,调整后一共发了9条消息到kafka集群
创建消费端
通过Kafka自带的kafka-console-consumer.sh脚本,来验证演示消费消息。
1
2在另一个终端,启动Consumer,并订阅我们上面创建的名称为lishijiaTop的Topic中生产的消息,执行如下脚本
[root@master201 bin]# ./kafka-console-consumer.sh --zookeeper master201:2181,slave202:2181,slave203:2181/kafka --from-beginning --topic lishijiaTop消费消息如下
刚才上面broker集群写错了,111,123消息也发送出去了。可以通过如下信息可以看到
zookeeper查看consumer对应topic的partition offset信息
通过如下可以看到:partition0有9条消息,partition1下面有2条消息。一共11条
通过Kafka自带的kafka-consumer-offset-checker.sh脚本,查看对应消费端的offset信息。
由于在创建消费端的时候没有设置名称,所以zookeeper给了默认的名称,然后这里再通过命令查看consumer消费情况
1
[root@master201 bin]# ./kafka-consumer-offset-checker.sh --zookeeper master201:2181,slave202:2181,slave203:2181/kafka --topic lishijiaTop --group console-consumer-76159 --broker-info
Topic其他操作
增加Topic的Partition
如果当前的这个topic已经有消费,那么可以看到这个信息是有问题的,所以最好把相关的consumer对应的信息修改对,以防一想不到的结果1
2通过kafka-topics.sh 的alter选项 ,将topic1的partitions从1增加到5
./kafka-topics.sh --alter --topic orderTopic --zookeeper master201:2181,slave202:2181,slave203:2181/kafka --partitions 5
总结
以上针对Kafka的配置就只修改了broker.id,zookeeper.connect配置信息
这里记录下其他配置信息解释
1 | broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样 |