Kafka集群安装配置

前言

Kafka是一种高吞吐量的分布式发布订阅消息系统,以下记录Kafka集群的安装验证过程。

环境准备&安装文件

Kafka安装配置

  • 上传安装包解压

    1
    2
    # 上传安装包至服务器
    [root@master201 Soft]# tar -xvf kafka_2.11-0.8.2.2
  • 修改server.properties配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    # 进入kafka配置目录
    [root@master201 Soft]# cd /home/lishijia/Soft/kafka_2.11-0.8.2.2/config
    [root@master201 config]# vim server.properties

    # broker.id与zookeeper myid 类似,必须唯一。当前kafka应用程序得唯一标识
    broker.id=201

    # 此处在末尾记住加上kafka,要不然针对kafka在zookeeper上创建的一些目录全部都是在根目录下
    zookeeper.connect=master201:2181,slave202:2181,slave203:2181/kafka
  • scp安装到其他两台机器&修改broker.id

    1
    2
    3
    4
    5
    6
    7
    8
    [root@master201 Soft]# scp -r kafka_2.11-0.8.2.2/ slave202:/home/lishijia/Soft
    [root@master201 Soft]# scp -r kafka_2.11-0.8.2.2/ slave203:/home/lishijia/Soft

    # 其他两台机器修改对应的broker.id
    [root@master201 config]# vim server.properties
    # 分别修改为对应机器ip的尾数
    broker.id=202
    broker.id=203
  • kafka集群启动

    1
    2
    3
    4
    5
    # 启动kafka之前确保独立的Zookeeper集群已经启动
    # 3台机器都需要执行以下命令
    [root@master201 bin]# ./kafka-server-start.sh ../config/server.properties
    [root@slave202 bin]# ./kafka-server-start.sh ../config/server.properties
    [root@slave203 bin]# ./kafka-server-start.sh ../config/server.properties
  • 通过Zookeeper检查是否启动

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # 通过zkCli连接上Zookeeper查看kafka目录状态,通过以下命令可以查看到对应的201,202,203节点已经存在于zookeeper当中
    # 如果把当中的一台broker停掉,那么以下ids目录下的节点则会对应的消失
    # 此处zookeepr使用的是临时节点方式监控对应的主机是否连接上,即如果broker对应的zookeeper连接断开,则当前节点会自动删除通过这种方式监控所存活的broker
    [zk: localhost:2181(CONNECTED) 0] ls /
    [zookeeper, kafka]
    [zk: localhost:2181(CONNECTED) 1] ls /kafka
    [admin, consumers, controller, controller_epoch, brokers, config]
    [zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers
    [ids, topics]
    [zk: localhost:2181(CONNECTED) 3] ls /kafka/brokers/ids
    [201, 202, 203]
    [zk: localhost:2181(CONNECTED) 4]

创建Topic

  • master201启动

    1
    2
    3
    4
    5
    6
    # 我们创建一个名称为lishijiaTop的Topic,5个分区,并且复制因子为3,执行如下命令:
    [root@master201 bin]# ./kafka-topics.sh --create --zookeeper master201:2181,slave202:2181,slave203:2181/kafka --replication-factor 3 --partitions 5 --topic lishijiaTop

    # 查看创建的Topic,执行如下命令:
    [root@master201 bin]# ./kafka-topics.sh --list --zookeeper master201:2181,slave202:2181,slave203:2181/kafka
    [root@master201 bin]# ./kafka-topics.sh --describe --zookeeper master201:2181,slave202:2181,slave203:2181/kafka --topic lishijiaTop

    可以通过如下图看到对应的分区信息,包括存储因子所在的broker

    mark

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    # zookeeper所存储的信息如下
    [zk: localhost:2181(CONNECTED) 5] ls /kafka/brokers
    [ids, topics]
    [zk: localhost:2181(CONNECTED) 6] ls /kafka/brokers/topics
    [lishijiaTop]
    [zk: localhost:2181(CONNECTED) 7] ls /kafka/brokers/topics/lishijiaTop
    [partitions]
    [zk: localhost:2181(CONNECTED) 8] ls /kafka/brokers/topics/lishijiaTop/partitions
    [0, 1, 2, 3, 4]
    [zk: localhost:2181(CONNECTED) 9]

创建生产端

  • 通过Kafka自带的kafka-console-producer.sh脚本,来验证演示发消息。

    1
    2
    # 在一个终端,启动Producer,并向我们上面创建的名称为lishijiaTop的Topic中生产消息,执行如下脚本
    [root@master201 bin]# ./kafka-console-producer.sh --broker-list master201:9092,master202:9092,master203:9092 --topic lishijiaTop
  • 发送消息

    可以在Producer终端上输入字符串消息行,然后回车。相当于就把消息发送到kafka当中,并且会根据配置的策略会存储到磁盘当中。

    如下刚才把集群的地址写错了,即报如下错误,调整后一共发了9条消息到kafka集群

创建消费端

  • 通过Kafka自带的kafka-console-consumer.sh脚本,来验证演示消费消息。

    1
    2
    # 在另一个终端,启动Consumer,并订阅我们上面创建的名称为lishijiaTop的Topic中生产的消息,执行如下脚本
    [root@master201 bin]# ./kafka-console-consumer.sh --zookeeper master201:2181,slave202:2181,slave203:2181/kafka --from-beginning --topic lishijiaTop
  • 消费消息如下

    刚才上面broker集群写错了,111,123消息也发送出去了。可以通过如下信息可以看到

  • zookeeper查看consumer对应topic的partition offset信息

    通过如下可以看到:partition0有9条消息,partition1下面有2条消息。一共11条

  • 通过Kafka自带的kafka-consumer-offset-checker.sh脚本,查看对应消费端的offset信息。

    由于在创建消费端的时候没有设置名称,所以zookeeper给了默认的名称,然后这里再通过命令查看consumer消费情况

    1
    [root@master201 bin]# ./kafka-consumer-offset-checker.sh --zookeeper master201:2181,slave202:2181,slave203:2181/kafka --topic lishijiaTop --group console-consumer-76159 --broker-info

Topic其他操作

  • 增加Topic的Partition

    1
    2
    通过kafka-topics.sh 的alter选项 ,将topic1的partitions从1增加到5 
    ./kafka-topics.sh --alter --topic orderTopic --zookeeper master201:2181,slave202:2181,slave203:2181/kafka --partitions 5
    如果当前的这个topic已经有消费,那么可以看到这个信息是有问题的,所以最好把相关的consumer对应的信息修改对,以防一想不到的结果

总结

以上针对Kafka的配置就只修改了broker.id,zookeeper.connect配置信息

这里记录下其他配置信息解释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=19092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880 #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口
分享到 评论