Kafka命令行使用

启动

# 注解版, 不可直接复制使用
# 前台启动
kafka-server-start.sh \
# 配置文件所在路径
/opt/module/kafka_2.13-3.3.1/config/server.properties
# 复制使用版本
kafka-server-start.sh \
/opt/module/kafka_2.13-3.3.1/config/server.properties

# 后台启动版本
kafka-server-start.sh \
-daemon \
/opt/module/kafka_2.13-3.3.1/config/server.properties

Topic主题

创建

kafka-topics.sh \
# 创建topic
--create \
# 连接kafka broker的host和port
--bootstrap-server hadoop001:9092 \
# 设置分区数(相当于将一个主题拆分成多少个文件夹)
--partitions 2 \
# 设置副本数
--replication-factor 2 \
# 设置topic的名称为xtopic
--topic xtopic
kafka-topics.sh \
--create \
--bootstrap-server hadoop001:9092 \
--partitions 2 \
--replication-factor 2 \
--topic xtopic

压力测试

压力测试和使用的topic主题节点相关, 例如xtopic是2副本2分区, 所以压力测试相当于测试2副本2分区的集群性能.

生产者基准测试

kafka-producer-perf-test.sh \
# 测试使用的主题为xtopic
--topic xtopic \
# 数量为5M
--num-records 5000000 \
# 吞吐量为-1表示无限制
--throughput -1 \
# 数据大小限制为1000B
--record-size 1000 \
# 其他属性配置
--producer-props \
    # kafka集群
bootstrap.servers=hadoop001:9092,hadoop002:9092,hadoop003:9092 \
    # ack应答机制
acks=1
kafka-producer-perf-test.sh \
--num-records 5000000 \
--throughput -1 \
--record-size 1000 \
--producer-props \
bootstrap.servers=hadoop001:9092,hadoop002:9092,hadoop003:9092 \
acks=1 \
--topic xtopic
  • 2副本2分区的测试结果

    image-20221103175519629

  • 1副本1分区的测试结果

    image-20221103180841963

消费者基准测试

kafka-consumer-perf-test.sh \
# 测试使用的主题为xtopic
--topic xtopic \
# 每次拉取的数据量的大小: 1MB
--fetch-size 1048576 \
# 要消费的数据量: 5M条
--messages 5000000 \
# kafka集群
--broker-list=hadoop001:9092,hadoop002:9092,hadoop003:9092
kafka-consumer-perf-test.sh \
--topic xtopic \
--fetch-size 1048576 \
--messages 5000000 \
--broker-list=hadoop001:9092,hadoop002:9092,hadoop003:9092

image-20221103175816558

查看分区和副本信息

kafka-topics.sh \
# 描述topic
--describe \
# 连接kafka集群
--bootstrap-server hadoop001:9092 \
# 设置查看的topic, 不设置代表查找全部
--topic xtopic
kafka-topics.sh \
--describe \
--bootstrap-server hadoop001:9092 \
--topic xtopic

image-20221103024715228

生产者

kafka-console-producer.sh \
# 生产者从broker-list中查找topic所在的分区, 发送到这些partition分区的leader服务器
--broker-list hadoop001:9092 \
--topic xtopic
kafka-console-producer.sh \
--broker-list hadoop001:9092 \
--topic xtopic

image-20221103084853648

消费者

  • 方式一: 不查看历史信息

      kafka-console-consumer.sh \
      # 从kafka集群中获取消息(bootstrap-server代表kafka集群)
      --bootstrap-server hadoop001:9092 \
      --topic xtopic
      # 指定消费者使用的配置文件, 可选项, 一般用于消费者
      --consumer.config /opt/module/kafka_2.13-3.3.1/config/consumer.properties

    image-20221103085307742

  • 方式二: 查看历史信息(不要再进行压力测试之后查看……)

      kafka-console-consumer.sh \
      # 从kafka集群中获取消息(bootstrap-server代表kafka集群)
      --bootstrap-server hadoop001:9092 \
      # 查看历史消息, 相当于查看未关注公众号前, 该公众号推送过的消息
      --from-beginning \
      --topic xtopic
      kafka-console-consumer.sh \
      --bootstrap-server hadoop001:9092 \
      --from-beginning \
      --topic xtopic

    image-20221103091008121

数据实际保存位置

由于目前还不清楚具体的分区规则, 因此分别查看 xtopic-0xtopic-1 这两个分区中的数据. 结果在 xtopic-1 中可以查看到

image-20221103092637617

查看zookeeper中保存的信息

  1. 通过 zkCli.sh命令 来启动zookeeper客户端

  2. 查看 brokers 节点信息

    image-20221103094039135

  3. 查看 topics 节点信息


   转载规则


《》 熊水斌 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
相同含义的对象使用数组暴力写法 List<Integer>[] redGraph = new List[n]; List<Integer>[] blueGraph = new List[n]; for (int i = 0; i
2023-05-28
下一篇 
02-安装Kafka 02-安装Kafka
单机测试创建主题 # 主题名为test kafka-topics.sh \ --create \ --zookeeper hadoop001:2181 \ --replication-factory 1 \ --partitions 1 \
2023-05-27
  目录