Kafka命令行使用
启动
# 注解版, 不可直接复制使用
# 前台启动
kafka-server-start.sh \
# 配置文件所在路径
/opt/module/kafka_2.13-3.3.1/config/server.properties
# 复制使用版本
kafka-server-start.sh \
/opt/module/kafka_2.13-3.3.1/config/server.properties
# 后台启动版本
kafka-server-start.sh \
-daemon \
/opt/module/kafka_2.13-3.3.1/config/server.properties
Topic主题
创建
kafka-topics.sh \
# 创建topic
--create \
# 连接kafka broker的host和port
--bootstrap-server hadoop001:9092 \
# 设置分区数(相当于将一个主题拆分成多少个文件夹)
--partitions 2 \
# 设置副本数
--replication-factor 2 \
# 设置topic的名称为xtopic
--topic xtopic
kafka-topics.sh \
--create \
--bootstrap-server hadoop001:9092 \
--partitions 2 \
--replication-factor 2 \
--topic xtopic
压力测试
压力测试和使用的topic主题节点相关, 例如xtopic是2副本2分区, 所以压力测试相当于测试2副本2分区的集群性能.
生产者基准测试
kafka-producer-perf-test.sh \
# 测试使用的主题为xtopic
--topic xtopic \
# 数量为5M
--num-records 5000000 \
# 吞吐量为-1表示无限制
--throughput -1 \
# 数据大小限制为1000B
--record-size 1000 \
# 其他属性配置
--producer-props \
# kafka集群
bootstrap.servers=hadoop001:9092,hadoop002:9092,hadoop003:9092 \
# ack应答机制
acks=1
kafka-producer-perf-test.sh \
--num-records 5000000 \
--throughput -1 \
--record-size 1000 \
--producer-props \
bootstrap.servers=hadoop001:9092,hadoop002:9092,hadoop003:9092 \
acks=1 \
--topic xtopic
2副本2分区的测试结果
1副本1分区的测试结果
消费者基准测试
kafka-consumer-perf-test.sh \
# 测试使用的主题为xtopic
--topic xtopic \
# 每次拉取的数据量的大小: 1MB
--fetch-size 1048576 \
# 要消费的数据量: 5M条
--messages 5000000 \
# kafka集群
--broker-list=hadoop001:9092,hadoop002:9092,hadoop003:9092
kafka-consumer-perf-test.sh \
--topic xtopic \
--fetch-size 1048576 \
--messages 5000000 \
--broker-list=hadoop001:9092,hadoop002:9092,hadoop003:9092
查看分区和副本信息
kafka-topics.sh \
# 描述topic
--describe \
# 连接kafka集群
--bootstrap-server hadoop001:9092 \
# 设置查看的topic, 不设置代表查找全部
--topic xtopic
kafka-topics.sh \
--describe \
--bootstrap-server hadoop001:9092 \
--topic xtopic
生产者
kafka-console-producer.sh \
# 生产者从broker-list中查找topic所在的分区, 发送到这些partition分区的leader服务器
--broker-list hadoop001:9092 \
--topic xtopic
kafka-console-producer.sh \
--broker-list hadoop001:9092 \
--topic xtopic
消费者
方式一: 不查看历史信息
kafka-console-consumer.sh \ # 从kafka集群中获取消息(bootstrap-server代表kafka集群) --bootstrap-server hadoop001:9092 \ --topic xtopic # 指定消费者使用的配置文件, 可选项, 一般用于消费者 --consumer.config /opt/module/kafka_2.13-3.3.1/config/consumer.properties
方式二: 查看历史信息(不要再进行压力测试之后查看……)
kafka-console-consumer.sh \ # 从kafka集群中获取消息(bootstrap-server代表kafka集群) --bootstrap-server hadoop001:9092 \ # 查看历史消息, 相当于查看未关注公众号前, 该公众号推送过的消息 --from-beginning \ --topic xtopic
kafka-console-consumer.sh \ --bootstrap-server hadoop001:9092 \ --from-beginning \ --topic xtopic
数据实际保存位置
由于目前还不清楚具体的分区规则, 因此分别查看 xtopic-0 和xtopic-1 这两个分区中的数据. 结果在 xtopic-1 中可以查看到
查看zookeeper中保存的信息
通过
zkCli.sh
命令 来启动zookeeper客户端查看 brokers 节点信息
查看 topics 节点信息