Kafka总结(五)Kafka 命令行详解上、topic、producer、consumer相关命令


  • 脚本列表
序号 脚本 功能
1 kafka-server-start.sh 启动kafka服务
2 kafka-server-stop.sh 停止kafka服务
3 kafka-topics.sh topic管理脚本
4 kafka-console-producer.sh kafka生产者控制台
5 kafka-replay-log-producer.sh 消费topic数据并转发到另外一个topic
5 kafka-console-consumer.sh kafka消费者控制台
7 kafka-simple-consumer-shell.sh 获取指定consumer group的位移信息
8 kafka-consumer-groups.sh kafka消费者组相关信息
9 kafka-producer-perf-test.sh kafka生产者性能测试脚本
10 kafka-consumer-perf-test.sh kafka消费者性能测试脚本
11 kafka-verifiable-consumer.sh 检验的kafka消费者
12 kafka-verifiable-producer.sh 检验的kafka生产者

1. kafka-server-start.sh

  • 功能:启动kafka broker
  • 用法:kafka-server-start.sh [-daemon] server.properties [–override property=value]*

参数说明

参数 含义
-daemon 后台运行
–override property=value 可以指定一些配置,覆盖配置文件中的配置
[root@node1 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
[root@node1 bin]#  lsof -i:9092
COMMAND   PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    34018 root  126u  IPv6 455181      0t0  TCP node1:XmlIpcRegSvc (LISTEN)

2. kafka-server-stop.sh

  • 功能:kill掉当前服务器上所有kafka broker

执行这个脚本结果可能为:No kafka server to stop

[root@node1 bin]# ./kafka-server-stop.sh
No kafka server to stop
[root@node1 bin]# 
  • 分析原因
    我们先看一下kafka-server-stop.sh脚本内容,这个脚本非常简单,就是得到所有包含kafka.Kafka的进程ID,但是由于kafka启动依赖比较多的jar,导致kafka进程的ps结果输出内容比较长,而ps输出结果受到PAGE_SIZE(其值通过命令getconf PAGE_SIZE可以得到)的限制,从而导致ps结果中看不到kafka.Kafka,所以不能kill掉kafka server:
[root@node1 bin]#  cat kafka-server-stop.sh
#!/bin/sh
PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')

if [ -z "$PIDS" ]; then
  echo "No kafka server to stop"
  exit 1
else
  kill -s TERM $PIDS
fi

为了kafka-server-stop.sh脚本可以正常执行,建议修改脚本如下,通过bin脚本所在目录的上级目录来查找进程ID,从而kill相关进程:

[root@node1 bin]#  cp kafka-server-stop.sh kafka-server-stop-new.sh
[root@node1 bin]# 
[root@node1 bin]#  vim kafka-server-stop-new.sh
# 修改为如下内容
cd `dirname $0`
BIN_DIR=`pwd`
cd ..
DEPLOY_DIR=`pwd`
SIGNAL=${SIGNAL:-TERM}

PIDS=$(ps ax | grep -i "${DEPLOY_DIR}" | grep java | grep -v grep | awk '{print $1}')

if [ -z "$PIDS" ]; then
  echo "No kafka server to stop"
  exit 1
else
  kill -s $SIGNAL $PIDS
fi

# 保存并退出
# 执行脚本测试是否停止成功
[root@node1 bin]#  jps -l
7728 org.apache.zookeeper.server.quorum.QuorumPeerMain
34018 kafka.Kafka
34131 sun.tools.jps.Jps
[root@node1 bin]# ./kafka-server-stop-new.sh
[root@node1 bin]# 
[root@node1 bin]# 
[root@node1 bin]#  jps -l # 隔几秒钟之后查询Java进程
7728 org.apache.zookeeper.server.quorum.QuorumPeerMain
34166 sun.tools.jps.Jps

3. kafka-topics.sh

  • 功能:创建、删除、查看或修改topic

参数说明

参数 含义
–zookeeper <String: urls> Zookeeper连接的连接字符串,格式为host:port,多个用逗号隔开
–list 列出所有可用的主题
–create 创建一个新的topic
–topic <String: topic> 要创建、修改删除的topic名称,除–create选项外还可以接受正则表达式
–partitions <Integer: 分区数量> 正在创建或更改的topic的分区数
–replication-factor <Integer:replication factor> 创建topic每个partition的副本数量
–describe 查看topic详细描述信息
–unavailable-partitions 如果在描述主题时设置,则仅显示其Leader不可用的分区
–topics-with-overrides 如果在描述主题时设置,则仅显示已覆盖配置的主题
–alter 更改主题的分区数、副本分配或配置。
–replica-assignment <String:broker_id_for
_part1_replica1 :broker_id_for_part1_replica2…>
创建或修改topic,手动指定partition数,副本数在哪些broker id上
–config <String: name=value> topic 相关配置,详细配置可以使用help命令查看
–delete 删除一个topic
–delete-config <String: name> 删除配置
–if-exists 检查topic是否存在,如果存在才会做修改或删除的相关操作
–if-not-exists 检查topic是否不存在,只有在主题不存在时才会进行创建操作
–help 帮助文档


3.1 查看所有topic

# zookeeper 可以写全部zk地址也可以只写一个,写一个时要保证连接的zookeeper是正常工作的
[root@node1 bin]# ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --list
__consumer_offsets
test1
test2
test3
test4
test5

3.2 查看topic 描述信息

# 查看topic描述信息
[root@node1 bin]# ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --topic test6 --describe
Topic:test1    PartitionCount:3    ReplicationFactor:1    Configs:
    Topic: test1    Partition: 0    Leader: 2    Replicas: 2    Isr: 2
    Topic: test1    Partition: 1    Leader: 0    Replicas: 0    Isr: 0
    Topic: test1    Partition: 2    Leader: 1    Replicas: 1    Isr: 1

# 查看已覆盖配置的主题 这里没有则未显示出来
[root@node1 bin]# ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --topic test6 --describe --topics-with-overrides
[root@node1 bin]#

#查看分区不可用的分区 在这之前我们关掉一个节点,所以查询出一个分区不可用
[root@node1 bin]# ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --topic test6 --describe --unavailable-partitions
    Topic: test1    Partition: 2    Leader: 1    Replicas: 1    Isr: 1

3.3 创建topic

# 创建新的topic test6 有三个分区和每个分区有一个副本
[root@node1 bin]# ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --create --topic test6 --partitions 3 --replication-factor 1
Created topic "test6".
[root@node1 bin]#
[root@node1 bin]# ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --create --topic test6 --partitions 3 --replication-factor 1 --if-not-exists
[root@node1 bin]#

# 创建topic 手动指定分区数与副本所在的节点
[root@node1 bin]# ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --create --topic test7 --replica-assignment 0:1,1:2,2:0

# 0:1,1:2,2:0 :其中"," 分隔的是分区数,":" 分隔的是副本都在那个节点上,
# "0:1,1:2,2:0"的意思是有三个分区,分区0 在 0、1 节点,分区1 在1、2 节点,分区2在2、0 节点

Created topic "test7".
[root@node1 bin]#
[root@node1 bin]# ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --topic test7 --describe
Topic:test7    PartitionCount:3    ReplicationFactor:2    Configs:
    Topic: test7    Partition: 0    Leader: 0    Replicas: 0,1    Isr: 0,1
    Topic: test7    Partition: 1    Leader: 1    Replicas: 1,2    Isr: 1,2
    Topic: test7    Partition: 2    Leader: 2    Replicas: 2,0    Isr: 2,0

3.4 修改topic配置

# 增加分区 将test 分区从3个增加到7个
[root@node1 bin]# ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --alter --topic test6 --partitions 7
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

# 查看修改是否成功
[root@node1 bin]# ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --topic test6 --describe
Topic:test6    PartitionCount:7    ReplicationFactor:1    Configs:
    Topic: test6    Partition: 0    Leader: 2    Replicas: 2    Isr: 2
    ...
    Topic: test6    Partition: 6    Leader: 2    Replicas: 2    Isr: 2

# 注意:一般kafka 分区不做减少操作。

# 副本数量的增加会在  kafka-reassign-partitions.sh 脚本中介绍。

3.5 删除topic

[root@node1 bin]# ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --delete --topic test6
Topic test6 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[root@node1 bin]#
[root@node1 bin]# ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --list
__consumer_offsets
test1
test2
test3
test4
test5
test7
# 查看到 test6 已经删除了
# 这时我们还要检查 zookeeper中 /brokers/topics 节点下是否还有test6节点,还有kafka-logs 下面是否还有test6 相关数据是否存在,如果存在一并删掉即可

4. kafka-console-producer.sh

  • 功能:console Producer 将数据发送到kafka

参数说明

参数 含义
–topic <String: topic> 要将数据发送到哪个topic
–broker-list <String: broker-list> broker字符串,格式为host:port,多个用逗号隔开
–batch-size <Integer: size> 单批次发送消息数。 (默认:200)
–compression-codec [String: compression-codec] 压缩格式:”none”,”gzip”,”snappy”或”lz4”。默认:gzip
–line-reader <String: reader_class> 用于从标准输入中读取行的类的类名。默认情况下,每行都作为单独的消息读取。(default: kafka.tools.ConsoleProducer$LineMessageReader)
–max-block-ms <Long: max block on send> 生产者在发送请求期间阻塞的最大时间 (default: 60000)
–max-memory-bytes <Long: total memory in bytes> 生产者用来缓冲等待发送到服务器的记录的总内存。(default: 33554432)
–max-partition-memory-bytes <Long:memory in bytes per partition> 为一个分区分配的缓冲区大小。当接收到的记录小于这个大小时,生产者将尝试将它们分组,直到达到这个大小。 (default: 16384)
–message-send-max-retries 消息发送失败重试次数(default: 3)
–metadata-expiry-ms <Long: metadata expiration interval> The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any leadership changes. (default: 300000)
–producer-property String:producer_prop 自定义的属性(key=value)传递给生产者的机制。
–producer.config <String: config file> 生产者配置属性文件。注意,[producer-property]优先级更高
–property <String: prop> A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user- defined message reader.
–queue-enqueuetimeout-ms <Integer:queue enqueuetimeout ms> 队列事件超时时间 (default: 2147483647)
–queue-size <Integer: queue_size> 生产者在异步模式下运行提供最大数量的消息将排队等待足够的批大小。(default: 10000)
–request-required-acks <String: request required acks> 生产者请求所需的ack(default: 1)
–request-timeout-ms <Integer: request timeout ms> ack请求超时时间 (default: 1500)
–retry-backoff-ms 在每次重试之前,生产者刷新相关主题的元数据。由于leader election要花费一些时间,因此此属性指定生产者在刷新元数据之前等待的时间量。(default: 100)
–socket-buffer-size <Integer: size> The size of the tcp RECV size. (default: 102400)
–sync 如果设置发送消息是同步的,一个批次的消息会同时到达。
–timeout <Integer: timeout_ms> 如果设置和生产者是在异步模式下运行,这将为消息队列等待足够的批处理大小提供最大时间量。数值用ms表示。(default: 1000)
–key-serializer String:encoder_class key 序列化方式
–value-serializer <String: encoder_class> value 序列化方式

4.1 启动生产者发送数据

# 首次创建一个新的topic test8 有两个分区 
[root@node2 bin]# ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --create --topic test8 --partitions 2 --replication-factor 1
Created topic "test8".
#启动生产者
[root@node1 bin]# ./kafka-console-producer.sh --broker-list node1:9092 --topic test8
>1
>2
>3
>4
>5
>6
>7
>8

5. kafka-replay-log-producer.sh

  • 功能:从某个topic消费数据后发送到另外一个topic

参数说明

参数 含义
–broker-list <String: hostname:port> broker 列表
–inputtopic <String: input-topic> 消费的主题。
–zookeeper <String: zookeeper url> zookeeper地址,(default: 127.0.0.1:2181)
–outputtopic <String: output-topic> 将消息发送到哪个topic
–messages <Integer: count> 要发送的消息的数量。 (default: -1)
–property <String: producer properties> 以kv对的格式设置属性传给producer。可以覆盖配置文件中的配置
–reporting-interval <Integer: size> 打印进度信息的时间间隔。 (default: 5000)
–sync 如果将消息设置为同步发送给broker,则在它们到达时一次发送一个请求。
–threads <Integer: threads> 发送线程的数量。 (default: 1)

# 从topic test3 消费数据发送到 test8,消费10条数据后自动退出

./kafka-replay-log-producer.sh --inputtopic test3  --outputtopic test8 --broker-list node1:9092,node2:9092,node3:9092  --messages 10 

# 启动消费者消费test8 的数据
[root@node3 bin]# ./kafka-console-consumer.sh --zookeeper node1:2181,node2:2181,node3:2181 --topic test8
1
2
3
4
5
6
7
8
9
10
^CProcessed a total of 10 messages

6. kafka-console-consumer.sh

  • 功能:从kafka 总消费数据输出到控制台

参数说明

参数 含义
–topic <String: topic> 要消费数据的topic
–zookeeper <String: urls> Zookeeper连接的连接字符串,格式为host:port,多个用逗号隔开
–bootstrap-server <String: server to connect to> 要连接的broker列表
–timeout-ms <Integer: timeout_ms> 如果指定,则在指定的时间间隔内没有可用消息时退出。
–max-messages <Integer: num_messages> 消耗多少条数据就退出。 如果未设置,会一直消费
–from-beginning 从最早的数据开始消费
–offset <String: consume offset> 默认:latest,要消费数据的offset,或者使用 earliest 消费最早的数据,或者使用 latest 消费最新的数据
–partition <Integer: partition> 要消费数据的分区。 在没有指定 ‘–offset’的情况下从末尾开始消费
–enable-systest-events 除了记录消耗的消息外,还记录使用者的生命周期事件。(这特定于系统测试。)
–group <String: consumer group id> consumer group id
–formatter <String: class> 用于格式化kafka消息以供显示的类的名称。(default: kafka.tools. DefaultMessageFormatter)
–isolation-level 默认值:read_uncommitted
设置为read_committed可以过滤掉未提交的事务性消息。
设置为read_uncommitted以读取所有消息。
–delete-consumer-offsets 如果指定,则在启动时删除zookeeper中的使用者路径
–key-deserializer <String:deserializer for key> key 序列化方式
–value-deserializer <String: deserializer for values> value 序列化方式
–skip-message-on-error 如果在处理消息时出现错误就跳过该消息
–property <String: prop> 用于初始化消息格式化程序的属性。
–metrics-dir <String: metrics directory> 如果设置了csv-reporter-enable,并且设置了此参数,则将在此处输出csv指标
–csv-reporter-enabled 如果设置,则将启用CSV指标报告器
–blacklist <String: blacklist> 消费topic的黑名单
–whitelist <String: whitelist> 消费topic的白名单
–consumer-property <String: consumer_prop> 传参 kv对的格式
–consumer.config <String: config file> Consumer配置属性文件。 [consumer-property]配置的优先级更高

6.1 启动消费者消费数据

[root@node2 bin]# ./kafka-console-consumer.sh --zookeeper node1:2181,node2:2181,node3:2181 --topic test7
1
2
3
4
5
^CProcessed a total of 5 messages

[root@node2 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test7
6
7
^CProcessed a total of 2 messages

6.2 自动退出

  • 启动消费者10秒钟后自动退出

[root@node2 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test7 --timeout-ms 10000
8
[2020-02-19 17:10:23,807] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
kafka.consumer.ConsumerTimeoutException
    at kafka.consumer.NewShinyConsumer.receive(BaseConsumer.scala:98)
    at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:129)
    at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 1 messages
  • 消费5条数据自动退出

[root@node2 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test7 --max-messages 5
16
17
18
19
20
Processed a total of 5 messages

6.3 从最早位置开始消费

./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test7 --from-beginning
1
2
3
4
5
^CProcessed a total of 5 messages

6.4 指定分区指定offset消费数据

offset 需要与partition参数一起使用

# 消费partition1 的数据从offset=3 开始消费
[root@node2 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test8 --offset 3 --partition 1
7
9
^CProcessed a total of 2 messages

# 消费partition1 的数据从offset=0 开始消费

[root@node2 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test8 --offset 0 --partition 1
1 ->offset=0
3 ->offset=1
5 ->offset=2
7 ->offset=3
9 ->offset=4

# 消费partition0 的数据从offset=0 开始消费
[root@node2 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test8 --offset 0 --partition 0
2 ->offset=0
4 ->offset=1
6 ->offset=2
8 ->offset=3
10 ->offset=4
^CProcessed a total of 5 messages

6.5 使用消费组


# 生产者生产数据
[root@node1 bin]# ./kafka-console-producer.sh --broker-list node1:9092 --topic test8
>11
>12
>13
>14
>15

#node3 启动消费者
[root@node3 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test8  --group g1
11
13
15

#node2 启动消费者
[root@node2 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test8  --group g1
12
14

7. kafka-simple-consumer-shell.sh

  • 功能:A low-level 工具,能直接从特定副本获取数据,标注为过期,它将在0.11.0.0之后的将来版本中删除

参数说明

参数 含义
–broker-list <String: servers> 必须: server连接的broker字符串,格式为host:port,多个用逗号隔开
–topic <String: topic> 消费的主题.
–partition <Integer: partition> 要消费的分区(default: 0)
–print-offsets 打印偏移量
–max-messages <Integer: max-messages> 要消费的消息数量 (default: 2147483647)
–no-wait-at-logend 如果设置此参数, 则当simple consumer消费完数据之后就会停止,而不会等待新的消息到来
–offset <Long: consume offset> 要使用的offsetid,默认为-2,表示从开始;而值-1表示从末端开始 (default: -2)
–max-wait-ms <Integer: ms> 每个fetch请求的最大等待时间。(default: 1000)
–formatter <String: class> 要用于格式化显示的kafka消息的类的名称。(default: kafka.tools. DefaultMessageFormatter)
–replica <Integer: replica id> 消费的副本id, 默认消费Leader -1 (default: -1)
–fetchsize <Integer: fetchsize> 每个请求获取的大小. (default: 1048576)
–skip-message-on-error 如果在处理消息时出现错误,跳过它而不是停止。
–clientId <String: clientId> 该客户端的ID(default: SimpleConsumerShell)

7.1 启动消费者消费数据

# 消费数据,默认消费分区0的数据
[root@node2 bin]# ./kafka-simple-consumer-shell.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test8
[2020-02-19 21:50:46,340] WARN WARNING: SimpleConsumerShell is deprecated and will be dropped in a future release following 0.11.0.0. (kafka.tools.SimpleConsumerShell$)
2
4
6
8
10
12
14

7.2 消费数据指定分区

[root@node2 bin]# ./kafka-simple-consumer-shell.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test8 --partition 1
[2020-02-19 21:56:37,987] WARN WARNING: SimpleConsumerShell is deprecated and will be dropped in a future release following 0.11.0.0. (kafka.tools.SimpleConsumerShell$)
1
3
5
7
9
11
13
15

7.3 消费数据打印offset

[root@node2 bin]# ./kafka-simple-consumer-shell.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test8 --partition 1 --print-offsets
[2020-02-19 21:59:08,651] WARN WARNING: SimpleConsumerShell is deprecated and will be dropped in a future release following 0.11.0.0. (kafka.tools.SimpleConsumerShell$)
next offset = 1
1
next offset = 2
3
next offset = 3
5
next offset = 4
7
next offset = 5
9
next offset = 6
11
next offset = 7
13
next offset = 8
15

7.4 自动退出

# 消费3条数据后退出
[root@node2 bin]# ./kafka-simple-consumer-shell.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test8 --partition 1 --max-messages 3
1
3
5

# 消费完数据后自动退出
[root@node2 bin]# ./kafka-simple-consumer-shell.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test8 --partition 1 --no-wait-at-logend
1
3
5
7
9
11
13
15
Terminating. Reached the end of partition (test8, 1) at offset 8

7.5 指定offset 消费

# 从offset = 2的位置开始消费到最后
[root@node2 bin]# ./kafka-simple-consumer-shell.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test8 --partition 1 --offset 2
5
7
9
11
13
15

# 从offset = 2的位置开始,消费3条数据
[root@node2 bin]# ./kafka-simple-consumer-shell.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test8 --partition 1 --offset 2 --max-messages 3
5
7
9

8. kafka-consumer-groups.sh

  • 功能:列出所有用户组,描述用户组,删除用户组信息,或重置用户组offset。

参数说明

参数 含义
–zookeeper <String: urls> Zookeeper连接的连接字符串,格式为host:port,多个用逗号隔开
–bootstrap-server <String: server> 要连接的服务器
–list 查看 consumer groups 列表.
–describe 描述消费者组并列出与给定组相关的偏移滞后(尚未处理的消息数量)。
–execute 查看消费者组并列出与给定组相关的偏移滞后(尚未处理的消息数量)。
–group <String: consumer group> 进行操作的consumer group
–reset-offsets 重置用户组的offset。同时支持一个消费者组,实例应该是不活动的,有3个执行选项
–to-current 将offset重置为当前offset。
–to-earliest 将offset重置为最早的offset。
–to-latest 将offset重置为最新offset。
–shift-by <Long: number-of-offsets> 移动offset n 位,n可以为正数或者负数
–to-offset <Long: offset> 将offset重置为特定offset。
–delete 传递组来删除整个使用者组上的主题分区offset和所有权信息。
–by-duration <String: duration> 重置offset,使其由当前时间戳的持续时间偏移。Format: ‘PnDTnHnMnS’
–all-topics 考虑”重置偏移”过程中分配给组的所有主题。
–export 导出操作执行到CSV文件。支持的操作: reset-offsets。
–from-file <String: path to CSV file> 将offset重置为CSV文件中定义的值。
–command-config <String: command config property file> 包含要传递给管理客户端和使用者的配置的属性文件。
–timeout <Long: timeout (ms)> 可以为某些用例设置超时。 例如,在描述组时可以使用它来指定在组稳定之前等待的最长时间(以毫秒为单位)(当组刚刚创建或正在进行一些更改时)。 (默认值:5000)
–to-datetime <String: datetime> 将offset重置为与日期时间的offset。 Format: ‘YYYY-MM-DDTHH:mm:SS.sss’
–topic <String: topic> 应删除其消费者组信息的主题或应将其包含在重置offset过程中的主题。

8.1 查看消费组list

#查看使用kafka保存的group
[root@node2 bin]# ./kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --list
console-consumer-24410
console-consumer-99327
g1
...
perf-consumer-19144

#查看使用zookeeper保存的group
[root@node2 bin]# ./kafka-consumer-groups.sh --zookeeper node1:2181,node2:2181,node3:2181 --li
console-consumer-64682
console-consumer-18902

8.2 查看消费组信息

[root@node2 bin]# ./kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --describe --group g1
TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID  HOST  CLIENT-ID
test8  1          8               8               0    -            -     -
test8  0          7               7               0    -            -     -

# 当消费正正在消费时
[root@node2 bin]# ./kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --describe --execute --group g1
TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID                                       HOST            CLIENT-ID
test8  0          7               7               0    consumer-1-d4f24bbe-1c9b-487c-880b-300d140df1d2   /172.16.72.152  consumer-1
test8  1          8               8               0    consumer-1-d4f24bbe-1c9b-487c-880b-300d140df1d2   /172.16.72.152  consumer-1
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
topic名字 分区id 当前已消费的条数 总条数 未消费的条数 消费id 主机ip 客户端id

8.3 重置offset

8.3.1 重置到最早的offset

[root@node2 bin]# ./kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --reset-offsets --to-earliest --topic test8 --execute --group g1
TOPIC                          PARTITION  NEW-OFFSET
test8                          1          0
test8                          0          0
[root@node2 bin]#
[root@node2 bin]# ./kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --describe --group g1
TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID  HOST  CLIENT-ID
test8  1          0               8               8    -            -     -
test8  0          0               7               7    -            -     -

# 启动消费者能够消费到所有数据
[root@node3 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test8  --group g1 --from-beginning
1
...
14

8.3.2 重置到最后的offset

[root@node2 bin]# ./kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --reset-offsets --to-latest  --topic test8 --group g1 --execute
TOPIC                          PARTITION  NEW-OFFSET
test8                          1          8
test8                          0          7
[root@node2 bin]#
[root@node2 bin]#
[root@node2 bin]# ./kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --describe --group g1
TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID  HOST  CLIENT-ID
test8  1          8               8               0    -            -     -
test8  0          7               7               0    -            -     -

# 启动消费者消费不到任何数据,因为我们已经把ooffset移动到了最后
[root@node3 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test8  --group g1 --from-beginning

8.3.3 offset 向前移动2位

[root@node2 bin]# ./kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --reset-offsets --shift-by  -2  --topic test8 --group g1 --execute
TOPIC                          PARTITION  NEW-OFFSET
test8                          1          6
test8                          0          5
[root@node2 bin]#
[root@node2 bin]#
[root@node2 bin]# ./kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --describe --group g1
TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID  HOST  CLIENT-ID
test8  1          6               8               2    -            -     -
test8  0          5               7               2    -            -     -

# 能消费到4条数据,以为有两个分区每个分区向前移动两位offset
[root@node3 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test8  --group g1 --from-beginning
13
15
12
14

8.3.4 将offset移动到指定位置

[root@node2 bin]# ./kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --reset-offsets --to-offset 3  --topic test8 --group g1 --execute
TOPIC                          PARTITION  NEW-OFFSET
test8                          1          3
test8                          0          3
[root@node2 bin]#
[root@node2 bin]# ./kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --describe --group g1
TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID  HOST  CLIENT-ID
test8  1          3               8               5    -            -     -
test8  0          3               7               4    -            -     -

# 启动消费者 每个分区都能消费到剩余数量的数据
[root@node3 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test8  --group g1 --from-beginning
7
9
11
13
15
8
10
12
14

注意:只有在该group 没有正在运行的消费者时才能重置offset

[root@node2 bin]# ./kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --reset-offsets --to-current  --topic test8 --group g1
Error: Assignments can only be reset if the group 'g1' is inactive, but the current state is Stable.

TOPIC                          PARTITION  NEW-OFFSET

9. kafka-producer-perf-test.sh

  • 功能:kafka自带压测命令
[root@node1 bin]# ./kafka-producer-perf-test.sh --topic test7 --num-records 100000 --record-size 1 --throughput 100000  --producer-props bootstrap.servers=node1:9092
100000 records sent, 67204.301075 records/sec (0.06 MB/sec), 11.44 ms avg latency, 227.00 ms max latency, 10 ms 50th, 26 ms 95th, 34 ms 99th, 61 ms 99.9th.
结果 说明
100000 records sent 共发送 100000(10w)条记录
67204.301075 records/sec (0.06 MB/sec) 67204.301075条记录/秒(0.06 MB/秒)
11.44 ms avg latency 平均延迟11.44 ms
227.00 ms max latency 最大延迟227.00 ms
10 ms 50th 第50秒10ms
26 ms 95th 第95秒26ms
34 ms 99th 第99秒34ms
61 ms 99.9th 第99.9秒61ms

10. kafka-consumer-perf-test.sh

  • 功能:

参数说明

参数 含义
–broker-list <String: servers> broker字符串,格式为host:port,多个用逗号隔开
–messages <Long: count> 要消费的消息数量
–topic <String: topic> 要消费的topic
–zookeeper <String: urls> Zookeeper连接的连接字符串,格式为host:port,多个用逗号隔开
–consumer.config <String: config file> Consumer配置文件
–date-format <String: date format> 用于格式化时间字段的日期格式。看到text。SimpleDateFormat选项。(default: yyyy-MM-dd HH:mm:ss:SSS)
–fetch-size <Integer: size> 在单个请求中获取的数据量。(default: 1048576)
–from-latest 如果使用者还没有已建立的offset可供使用,则从日志中出现的最新消息开始,而不是从最早的消息开始。
–group <String: gid> 消费组ID (default: perf-consumer-87178)
–hide-header 如果设置,则不显示打印统计信息的标题
–num-fetch-threads <Integer: count> fetcher线程的数量 (default: 1)
–reporting-interval Integer:interval_ms 打印进度信息的间隔 单位毫秒。 (default: 5000)
–show-detailed-stats 每隔指定的时间返回一次状态
–socket-buffer-size <Integer: size> The size of the tcp RECV size. (default: 2097152)
–threads <Integer: count> 线程数 (default: 10)
–help 打印帮助信息

[root@node2 bin]# ./kafka-consumer-perf-test.sh --broker-list ode1:9092,node2:9092,node3:9092 --topic test7 --messages 1000
start.time,               end.time,                data.consumed.in.MB,     MB.sec,  data.consumed.in.nMsg, nMsg.sec,   rebalance.time.ms, fetch.time.ms,  fetch.MB.sec, fetch.nMsg.sec
2020-02-19 23:45:14:743,  2020-02-19 23:45:15:027, 0.0010,                  0.0034,  1000,                  3521.1268,   24,               260,            0.0037,       3846.1538

性能测试扩展阅读:https://www.cnblogs.com/xiaodf/p/6023531.html

11. kafka-verifiable-consumer.sh

  • 功能:接收指定topic的消息消费,并发出消费者事件,例如:offset提交等。
[root@node3 bin]# ./kafka-verifiable-consumer.sh  --broker-list node1:9092,node2:9092,node3:9092 --topic test8  --group g1
{"timestamp":1582127921386,"name":"startup_complete"}
{"timestamp":1582127921557,"name":"partitions_revoked","partitions":[]}
{"timestamp":1582127921584,"name":"partitions_assigned","partitions":[{"topic":"test8","partition":1},{"topic":"test8","partition":0}]}
{"timestamp":1582127936593,"name":"partitions_revoked","partitions":[{"topic":"test8","partition":1},{"topic":"test8","partition":0}]}
{"timestamp":1582127936606,"name":"partitions_assigned","partitions":[{"topic":"test8","partition":1}]}

12. kafka-verifiable-producer.sh

  • 功能:建议使用该脚本时增加参数–max-messages,否则会不停的发送消息。这个脚本的作用是持续发送消息到指定的topic中,参数–max-messages限制最大发送消息数。且每条发送的消息都会有响应信息,这就是和kafka-console-producer.sh最大的不同。
参数 含义
–topic TOPIC 将数据发送到哪个topic
–broker-list HOST1:PORT1[,HOST2:PORT2[…]] 连接的brokers
–max-messages MAX-MESSAGES 生产多少数据,-1 为不限制
–throughput THROUGHPUT If set >= 0, throttle maximum message throughput to approximately THROUGHPUT messages/sec. (default: -1)
–acks ACKS Acks required on each produced message. See Kafka docs on acks for details. (default: -1)
–producer.config CONFIG_FILE 配置文件
–message-create-time CREATETIME 发送消息,其创建时间从参数值开始,以毫秒为单位
–value-prefix VALUE-PREFIX If specified, each produced value will have this prefix with a dot separator
[root@node3 bin]# ./kafka-verifiable-producer.sh --broker-list node1:9092 --topic test8 --max-messages 4
{"timestamp":1582128257099,"name":"startup_complete"}
{"timestamp":1582128257282,"name":"producer_send_success","key":null,"value":"1","offset":8,"topic":"test8","partition":0}
{"timestamp":1582128257285,"name":"producer_send_success","key":null,"value":"3","offset":9,"topic":"test8","partition":0}
{"timestamp":1582128257285,"name":"producer_send_success","key":null,"value":"0","offset":8,"topic":"test8","partition":1}
{"timestamp":1582128257285,"name":"producer_send_success","key":null,"value":"2","offset":9,"topic":"test8","partition":1}
{"timestamp":1582128257295,"name":"shutdown_complete"}
{"timestamp":1582128257296,"name":"tool_data","sent":4,"acked":4,"target_throughput":-1,"avg_throughput":20.30456852791878}

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Kafka总结(五)Kafka 命令行详解下、配置、副本、测试等相关命令 Kafka总结(五)Kafka 命令行详解下、配置、副本、测试等相关命令
脚本列表 序号 脚本 功能 1 connect-distributed.sh && connect-standalone.sh 连接kafka集群/单机模式 2 kafka-run-class.sh 执行
2020-02-19
下一篇 
Java中线程状态与线程死锁 Java中线程状态与线程死锁
1. 什么是线程 线程(英语:thread)是操作系统能够进行运算调度的最小单位。大部分情况下,它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。在
2020-02-15
  目录