Kafka总结(五)Kafka 命令行详解下、配置、副本、测试等相关命令


  • 脚本列表
序号 脚本 功能
1 connect-distributed.sh && connect-standalone.sh 连接kafka集群/单机模式
2 kafka-run-class.sh 执行kafka.tools的部分功能
3 kafka-reassign-partitions.sh 分区重分配脚本
4 kafka-configs.sh 配置管理脚本
5 kafka-delete-records.sh 删除低水位的数据
6 kafka-log-dirs.sh kafka消息日志目录信息
7 kafka-preferred-replica-election.sh 触发preferred replica选举
8 kafka-replica-verification.sh 复制进度验证脚本
9 kafka-mirror-maker.sh 不同数据中心kafka集群复制工具
10 kafka-acls.sh 权限管理
11 kafka-streams-application-reset.sh kafka stream 命令

1. connect-distributed.sh && connect-standalone.sh

Kafka Connect是在0.9以后加入的功能,主要是用来将其他系统的数据导入到Kafka,然后再将Kafka中的数据导出到另外的系统。可以用来做实时数据同步的ETL,数据实时分析处理等。
主要有2种模式:Standalone(单机模式)和Distribute(分布式模式)。
单机主要用来开发,测试,分布式的用于生产环境。
用法比较复杂,建议参考:https://www.orchome.com/344

2. kafka-run-class.sh

  • 功能:USAGE: ./kafka-run-class.sh [-daemon] [-name servicename] [-loggc] classname [opts]

参数说明

参数 含义
–group 消费组
–output-file 输出文件
–zkconnect zookeeper连接字符串
–help 打印帮助信息

[root@node2 bin]# ./kafka-run-class.sh kafka.tools.ExportZkOffsets --zkconnect node1:2181 --group console-consumer-64682 --output-file out_offsets
[2020-02-19 23:21:22,209] WARN WARNING: ExportZkOffsets is deprecated and will be dropped in a future release following 0.11.0.0. (kafka.tools.ExportZkOffsets$)


# 查看导出文件
[root@node2 bin]# cat out_offsets
/consumers/console-consumer-64682/offsets/test7/0:2
/consumers/console-consumer-64682/offsets/test7/1:1
/consumers/console-consumer-64682/offsets/test7/2:2

# segment片段信息
[root@node2 bin]# ./kafka-run-class.sh kafka.tools.DumpLogSegments --files ../kafka-logs/test8-0/00000000000000000000.log
Dumping ../kafka-logs/test8-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 0 CreateTime: 1582105410382 isvalid: true size: 78 magic: 2 compresscodec: NONE crc: 4147696842
baseOffset: 2 lastOffset: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 78 CreateTime: 1582105433073 isvalid: true size: 78 magic: 2 compresscodec: NONE crc: 3173743410
baseOffset: 4 lastOffset: 4 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 156 CreateTime: 1582105443091 isvalid: true size: 70 magic: 2 compresscodec: NONE crc: 3076317218
baseOffset: 5 lastOffset: 5 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 226 CreateTime: 1582106506420 isvalid: true size: 70 magic: 2 compresscodec: NONE crc: 1906249676
baseOffset: 6 lastOffset: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 296 CreateTime: 1582106508354 isvalid: true size: 70 magic: 2 compresscodec: NONE crc: 253490739


[root@node2 bin]# ./kafka-run-class.sh kafka.tools.DumpLogSegments --files ../kafka-logs/test8-0/00000000000000000000.index
Dumping ../kafka-logs/test8-0/00000000000000000000.index
offset: 0 position: 0

3. kafka-reassign-partitions.sh

  • 功能:此命令在副本之间移动主题分区。

参数说明

参数 含义
–zookeeper <String: urls> Zookeeper连接的连接字符串,格式为host:port,多个用逗号隔开
–reassignment-json-file <String: manual assignment json file path> 要使用的格式是- {“partition “: [{“topic”: “foo”, “partition”: 1, “replicas”: [1,2,3], “log_dirs”: [“dir1”,”dir2”,”dir3”]}], “version”:1}注意,” log_dirs”是可选的。当它被指定时,它的长度必须等于副本列表的长度。此列表中的值可以是代理上的日志目录的”any”或赦免路径。如果指定了绝对日志目录路径,则当前需要的是尚未在该代理上创建副本。随后将在代理上的指定日志目录中创建副本。
–verify 验证重新分配是否按照—resignment - json-file选项指定的方式完成。如果存在为指定的副本占用的节流阀,并且重新平衡已经完成,则节流阀将被移除
–execute 按照–sign -json-file选项指定的方式开始重新分配
–bootstrap-server <String: Server(s) to use for bootstrapping> 用于引导的服务器。 为重新分配json文件中的任何副本指定了日志导向器的绝对路径所需
–broker-list <String: brokerlist> 需要以”0,1,2”的形式重新分配分区的broker列表。如果使用-topic-to-move-json-file生成重新分配配置,则需要这样做
–generate 生成一个候选分区重新分配配置。注意,这只生成一个候选赋值,而不执行它。
–throttle <Long: throttle> 分区的移动将被调整到这个值(字节/秒)。重新运行此选项,而一个平衡正在进行中,将改变油门值。节流速率至少为1kb /s。(默认值:1)
–timeout <Long: timeout> 允许等待成功启动分区重分配执行的最长时间(默认值:10000)
–topics-to-move-json-file <String: topics to reassign json file path> 生成一个重新分配配置,将指定主题的分区移动到由–broker- list选项指定的代理列表中。使用的格式是- {“topics”: [{“topic”: “foo”},{“topic”: “foo1”}], “version”:1}
–disable-rack-aware 禁用支持机架的复制分配

3.1 移动副本位置

# 查看副本状态
[root@node1 bin]# ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --topic test8 --describe
Topic:test8    PartitionCount:2    ReplicationFactor:1    Configs:
    Topic: test8    Partition: 0    Leader: 1    Replicas: 1    Isr: 1
    Topic: test8    Partition: 1    Leader: 2    Replicas: 2    Isr: 2

# 把 partition 0 副本移动到 broker 2,把 partition 1 副本移动到 broker 0,


编辑副本移动文件 放入 topic-reassignment.json 文件中
{"partitions":[{"topic":"test8","partition":0,"replicas":[2]},{"topic":"test8","partition":1,"replicas":[0]}],"version":1}

# 执行副本移动
[root@node1 bin]# ./kafka-reassign-partitions.sh --zookeeper node1:2181 --reassignment-json-file ./topic-reassignment.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"test8","partition":0,"replicas":[1],"log_dirs":["any"]},{"topic":"test8","partition":1,"replicas":[2],"log_dirs":["any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

#查看topic分区分布情况,确实按照我们的配置修改了位置
[root@node1 bin]# ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --topic test8 --describe
Topic:test8    PartitionCount:2    ReplicationFactor:1    Configs:
    Topic: test8    Partition: 0    Leader: 2    Replicas: 2    Isr: 2
    Topic: test8    Partition: 1    Leader: 0    Replicas: 0    Isr: 0

# 验证副本移动是否成功
[root@node1 bin]# ./kafka-reassign-partitions.sh --zookeeper node1:2181 --reassignment-json-file ./topic-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition test8-0 completed successfully
Reassignment of partition test8-1 completed successfully

3.2 扩展副本

把 partitin 0 在broker 1 增加一个副本,把 partitin 1 在broker 1 增加一个副本

#首先编辑扩展副本文件
vim add-re.json
{"partitions":[{"topic":"test8","partition":0,"replicas":[2,1]},{"topic":"test8","partition":1,"replicas":[0,1]}],"version":1}


[root@node1 bin]# ./kafka-reassign-partitions.sh --zookeeper node1:2181 --reassignment-json-file ./add-re.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"test8","partition":0,"replicas":[2],"log_dirs":["any"]},{"topic":"test8","partition":1,"replicas":[0],"log_dirs":["any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

# 查看分区情况
[root@node1 bin]# ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --topic test8 --describe
Topic:test8    PartitionCount:2    ReplicationFactor:2    Configs:
    Topic: test8    Partition: 0    Leader: 2    Replicas: 2,1    Isr: 2,1
    Topic: test8    Partition: 1    Leader: 0    Replicas: 0,1    Isr: 0,1

# 验证副本移动是否成功
[root@node1 bin]# ./kafka-reassign-partitions.sh --zookeeper node1:2181 --reassignment-json-file ./add-re.json --verify
Status of partition reassignment:
Reassignment of partition test8-0 completed successfully
Reassignment of partition test8-1 completed successfully

4. kafka-configs.sh

  • 功能:添加/删除topic、client、user或broker的实体配置

参数说明

参数 含义
–zookeeper <String: urls> Zookeeper连接的连接字符串,格式为host:port,多个用逗号隔开
–describe 列出给定实体的配置。
–add-config 要添加的配置的键值对。方括号可用于将包含逗号的值分组:”k1=v1,k2=[v1,v2,v2],k3=v3”。下面是有效配置的列表:对于entity_type
–alter 更改实体的配置。
–delete-config 从配置中删除配置 key
–entity-default 客户端/用户的默认实体名称 (applies to corresponding entity type in command line)
–entity-name 实体的名称 (topic name/client id/user principal name/broker id)
–entity-type 实体的类型 (topics/clients/users/brokers)
–force 禁止控制台提示
–help 帮助文档

4.1 查看配置

# 查看topic 配置
[root@node1 bin]# ./kafka-configs.sh --zookeeper node1:2181 --describe --entity-type topics
Configs for topic 'test4' are
Configs for topic 'test5' are
Configs for topic 'test2' are
Configs for topic 'test3' are
Configs for topic 'test8' are
Configs for topic 'test7' are
Configs for topic 'test1' are
Configs for topic '__consumer_offsets' are segment.bytes=104857600,cleanup.policy=compact,compression.type=producer

4.2 添加配置


[root@node1 bin]# ./kafka-configs.sh --zookeeper node1:2181 --alter --entity-type topics --entity-name test1 --add-config flush.ms=5000
Completed Updating config for entity: topic 'test1'.
[root@node1 bin]#
#查看增加的配置
[root@node1 bin]# ./kafka-configs.sh --zookeeper node1:2181 --describe --entity-type topics
Configs for topic 'test4' are
Configs for topic 'test5' are
Configs for topic 'test2' are
Configs for topic 'test3' are
Configs for topic 'test8' are
Configs for topic 'test7' are
Configs for topic 'test1' are flush.ms=5000 # 刚刚增加的配置
Configs for topic '__consumer_offsets' are segment.bytes=104857600,cleanup.policy=compact,compression.type=producer

4.3 删除配置

# 删除配置
[root@node1 bin]# ./kafka-configs.sh --zookeeper node1:2181 --alter --entity-type topics --entity-name test1 --delete-config  flush.ms
Completed Updating config for entity: topic 'test1'.
# 查看topic配置
[root@node1 bin]# ./kafka-configs.sh --zookeeper node1:2181 --describe --entity-type topics
Configs for topic 'test4' are
Configs for topic 'test5' are
Configs for topic 'test2' are
Configs for topic 'test3' are
Configs for topic 'test8' are
Configs for topic 'test7' are
Configs for topic 'test1' are #删掉了上一步增加的配置
Configs for topic '__consumer_offsets' are segment.bytes=104857600,cleanup.policy=compact,compression.type=producer

5. kafka-delete-records.sh

  • 功能:
    参数说
参数 含义
–bootstrap-server <String: server(s)> 要连接的broker列表
–offset-json-file <String: Offset json file path> 每个分区具有偏移量的JSON文件。使用的格式是:{“partition “: [{“topic”: “foo”, “partition”: 1, “offset”: 1}], “version”:1}
–command-config <String: command config property file path> 要传递给Admin Client的配置的属性文件。

5.1 删除数据

# 首先编辑json文件
vim offset.json
{"partitions":[{"topic":"test8", "partition":1, "offset": 5}], "version":1}

# 查看test8 partition 1 offset 状态
[2020-02-20 16:50:40,821] WARN WARNING: SimpleConsumerShell is deprecated and will be dropped in a future release following 0.11.0.0. (kafka.tools.SimpleConsumerShell$)
next offset = 1
1
next offset = 2
3
next offset = 3
5
next offset = 4
7
next offset = 5
9
next offset = 6
11
next offset = 7
13
next offset = 8
15
next offset = 9
0
next offset = 10
2

# 执行offset 删除

[root@node1 bin]# ./kafka-delete-records.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --offset-json-file offset.json
Executing records delete operation
Records delete operation completed:
partition: test8-1    low_watermark: 5

# 再次查看offset状态

[2020-02-20 16:51:23,834] WARN WARNING: SimpleConsumerShell is deprecated and will be dropped in a future release following 0.11.0.0. (kafka.tools.SimpleConsumerShell$)
next offset = 6
11
next offset = 7
13
next offset = 8
15
next offset = 9
0
next offset = 10
2

发现offset =5 之前的数据都被删除了

5.2 删除offset 大于现有数据

# 将配置文件编辑为:
{"partitions":[{"topic":"test8", "partition":1, "offset": 5}], "version":1}

[root@node1 bin]# ./kafka-delete-records.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --offset-json-file offset.json
Executing records delete operation
Records delete operation completed:
partition: test8-1    error: org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested offset is not within the range of offsets maintained by the server.
# 将会抛出数组下标越界的异常

5.3 删除offset 等于现有数据最大offset

# 将配置文件编辑为:
{"partitions":[{"topic":"test8", "partition":1, "offset": 10}], "version":1}

[root@node1 bin]# ./kafka-delete-records.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --offset-json-file offset.json
Executing records delete operation
Records delete operation completed:
partition: test8-1    low_watermark: 10

# 查看offset 状态 发现已经没有数据了,也就是将数据清空了
[root@node2 bin]# ./kafka-simple-consumer-shell.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test8 --partition 1 --print-offsets

6. kafka-log-dirs.sh

  • 功能:

参数说明

参数 含义
–bootstrap-server <String: The server (s) to use for bootstrapping> 要连接的broker列表
–broker-list <String: Broker list> 要查询的broker列表,格式为”0,1,2”。 如果未指定列表,将查询集群中的所有broker
–describe 描述指定broker上的指定日志目录。
–topic-list <String: Topic list> 要查询的主题列表,形式为”topic1,topic2,topic3”。 如果未指定主题列表,将查询所有主题


[root@node2 bin]# ./kafka-log-dirs.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --describe --topic-list test1,test2
Querying brokers for log directories information
Received log directory information from brokers 0,1,2

{
    "version": 1,
    "brokers": [
        {
            "broker": 0,
            "logDirs": [
                {
                    "logDir": "/opt/kafka_2.12-1.0.2/kafka-logs",
                    "error": null,
                    "partitions": [
                        {
                            "partition": "test2-1",
                            "size": 0,
                            "offsetLag": 0,
                            "isFuture": false
                        },
                        {
                            "partition": "test1-1",
                            "size": 17460,
                            "offsetLag": 0,
                            "isFuture": false
                        }
                    ]
                }
            ]
        },
        {
            "broker": 1,
            ...
        },
        {
            "broker": 2,
            ...
        }
    ]
}

7. kafka-preferred-replica-election.sh

  • 功能:手动partition leader 均衡

参数说明
kafka提供了一个参数auto.leader.rebalance.enable自动做这件事情,且默认为true,原理是一个后台线程检查并触发leader balance。但是并不建议把这个参数设置为true。因为担心这个自动选举发生在业务高峰期,从而导致影响业务。

参数 含义
–zookeeper <String: urls> Zookeeper连接的连接字符串,格式为host:port,多个用逗号隔开
–path-to-json-file <String: list of partitions for which preferred replica leader election needs to be triggered> JSON文件中包含了一个分区列表,优先选择的是哪个分区,格式如下:{“partitions”: [{“topic”: “foo”, “partition”: 1}, {“topic”: “foobar”, “partition”: 2}]}默认为所有现有分区
vim leader.json
{"partitions":[{"topic": "test3", "partition": 0},{"topic": "test3", "partition": 1}, {"topic": "test3", "partition": 2}]}

# 首先查看topic test3 的副本leader分布情况,发现所有的leader都在 broker 2 上面
[root@node2 bin]# ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --topic test3 --describe
Topic:test3    PartitionCount:3    ReplicationFactor:3    Configs:
    Topic: test3    Partition: 0    Leader: 2    Replicas: 1,2,0    Isr: 2,0,1
    Topic: test3    Partition: 1    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: test3    Partition: 2    Leader: 2    Replicas: 0,1,2    Isr: 2,0,1

# 对副本leader进行均衡分布
[root@node2 bin]# ./kafka-preferred-replica-election.sh --zookeeper node1:2181,node2:2181,node3:2181 --path-to-json-file ./leader.json
Created preferred replica election path with {"version":1,"partitions":[{"topic":"test3","partition":0},{"topic":"test3","partition":1},{"topic":"test3","partition":2}]}
Successfully started preferred replica election for partitions Set(test3-0, test3-1, test3-2)
[root@node2 bin]#

# 再次查看leader 分布情况,能够看到已经按照配置文件中的设置进行分布了
[root@node2 bin]# ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --topic test3 --describe
Topic:test3    PartitionCount:3    ReplicationFactor:3    Configs:
    Topic: test3    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 2,0,1
    Topic: test3    Partition: 1    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: test3    Partition: 2    Leader: 0    Replicas: 0,1,2    Isr: 2,0,1

8. kafka-replica-verification.sh

  • 功能:验证topic的所有副本的同步数据情况

参数说明

参数 含义
–broker-list String:servers 要连接的broker
–fetch-size <Integer: bytes> 每个请求的获取大小(default: 1048576)
–max-wait-ms <Integer: ms> 每个fetch请求的最大等待时间。(default: 1000)
–report-interval-ms <Long: ms> 报告时间间隔。(default: 30000)
–time <Long: timestamp/-1(latest)/-2 (earliest)> 获取初始偏移量的时间戳。(default: -1)
–topic-white-list <String: Java regex (String)> 验证副本一致性的主题白列表。默认为所有主题。 (default: .*)

[root@node2 bin]# ./kafka-replica-verification.sh --broker-list node1:9092,node2:9092,node3:9092  --topic-white-list test3
2020-02-20 17:15:12,267: verification process is started.
2020-02-20 17:15:42,208: max lag is 0 for partition test3-0 at offset 14 among 3 partitions
2020-02-20 17:16:12,209: max lag is 0 for partition test3-0 at offset 14 among 3 partitions
2020-02-20 17:16:42,210: max lag is 0 for partition test3-0 at offset 14 among 3 partitions

# 上面输出信息表示为这个topic的复制没有任何延迟

9. kafka-mirror-maker.sh

  • 功能:集群镜像工具,实现kafka集群间的数据同步,mirror maker进程可以同时开启多个,提高吞吐量。如果其中一个进程挂了,其他线程还能接管其同步任务

参数说明

参数 含义
–abort.on.send.failure <String: Stop the entire mirror maker when a send failure occurs> 将镜像生成器配置为在发送失败时退出。(default: true)
–blacklist <String: Java regex (String)> 要镜像的主题的黑名单。只有老客户支持黑名单。
–consumer.config <String: config file> 用于从源集群消费的Consumer配置。
–consumer.rebalance.listener <String: A custom rebalance listener of type ConsumerRebalanceListener> 消费者再平衡监听器用于镜像制造商消费者。
–message.handler <String: A custom message handler of type MirrorMakerMessageHandler> 消息处理程序,它将处理消费者和生产者之间的每条记录。
–message.handler.args <String: Arguments passed to message handler constructor.> 自定义消息处理程序为镜像生成器使用的参数
–new.consumer 在镜像生成器中使用新用户(这是默认设置)。
–num.streams <Integer: Number of threads> 消费流的数量,默认1
–offset.commit.interval.ms <Integer:offset commit interval in millisecond> 提交offset的时间间隔,单位毫秒default: 60000)
–producer.config <String: config file> Embedded producer config.
–rebalance.listener.args <String: Arguments passed to custom rebalance listener constructor as a string.> 自定义重新平衡侦听器用于镜像生成器使用者的参数。
–whitelist <String: Java regex (String)> 镜像topic的白名单。
–help 打印帮助信息
./kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer.properties --num.streams 2 --producer.config producer.properties --whitelist="test*"

consumer.config 配置示例

# 指定消费者
group.id=group-designer
# 这是一个数量阈值,经测试是500条,当consumer消费一定量的消息之后,将会自动向zookeeper提交offset信息#注意offset信息并不是每消费一次消息就向zk提交一次,而是现在本地保存(内存),并定期提交,默认为true
auto.commit.enable=true
# 自动更新时间。默认60 * 1000
auto.commit.interval.ms=1000
# 当前consumer的标识,可以设定,也可以有系统生成,主要用来跟踪消息消费情况,便于观察
conusmer.id=xxx
# 消费者客户端编号,用于区分不同客户端,默认客户端程序自动产生
client.id=xxxx
# 最大取多少块缓存到消费者(默认10)
queued.max.message.chunks=50
# zookeeper地址
zookeeper.connect=node1:2181,node2:2181,node3:2181
# zookeeper的session过期时间,默认5000ms,用于检测消费者是否挂掉
zookeeper.connection.timeout.ms=6000
# 当消费者挂掉,其他消费者要等该指定时间才能检查到并且触发重新负载均衡
zookeeper.connection.timeout.ms=10000
# 这是一个时间阈值,指定多久消费者更新offset到zookeeper中,注意offset更新时基于time而不是每次获得的消息,一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息
zookeeper.sync.time.ms=2000
# 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新 的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册 "Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点, 此值用于控制,注册节点的重试次>数.
rebalance.max.retries=5
# 每拉取一批消息的最大字节数,获取消息的最大尺寸,broker不会像consumer输出大于此值的消息chunk 每次feth将得到多条消息,此值为总大小,提升此值,将会消耗更多的consumer端内存
fetch.min.bytes=1048576
# 当消息的尺寸不足时,server阻塞的时间,如果超时,消息将立即发送给consumer,数据一批一批到达,如果每一批是10条消息,如果某一批还不到10条,但是超时了,也会立即发送给consumer。
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360
# 如果zookeeper没有offset值或offset值超出范围,那么就给个初始的offset。有smallest、largest、anything可选,分别表示给当前最小的offset 当前最大的offset 抛异常。默认largest
auto.offset.reset=largest
# 指定序列化处理类
derializer.class=kafka.serializer.DefaultDecoder

producer.config 配置示例

# 源集群
bootstrap.servers=node1:9092,node2:9092,node3:9092
acks=all
retries=3
producer.type=async
# 批次大点,增加点吞吐量性能
batch.size=16384
buffer.memory=33554432
linger.ms=1

10. kafka-acls.sh

  • 功能:

参数说明

参数 含义
–authorizer-properties <String: authorizer-properties> 配置Authorizer实例所需的属性。 KV对. 默认示例:connect=localhost:2181
–add 添加ACL。
–allow-host <String: allow-host> –allow-principal中列出的主体可以访问的主机。如果您指定了–allow-principal,则此选项的默认值将设置为*,允许从所有主机进行访问。
–allow-principal <String: allow-principal> principal is in principalType:name format. Note that principalType must be supported by the Authorizer being used. For example, User:* is the wild card indicating all users.
–authorizer <String: authorizer> 授权方的完全限定类名,(默认:kafka.security.auth.SimpleAclAuthorizer)
–cluster 添加/删除集群acl。
–consumer 为使用者角色添加/删除acl的方便选项。这将生成允许读、主题描述和组读的acl。
–deny-host <String: deny-host> 拒绝访问–deny-principal中列出的主体所在的主机。如果您指定了–deny–principal,那么这个选项的默认值将设置为*,它拒绝所有主机的访问。
–deny-principal <String: deny- principal> principal is in principalType:name format. By default anyone not added through –allow-principal is denied access. You only need to use this option as negation to already allowed set. Note that principalType must be supported by the Authorizer being used. For example if you wanted to allow access to all users in the system but not test-user you can define an ACL that allows access to User:* and specify –deny- principal=User:test@EXAMPLE.COM. AND PLEASE REMEMBER DENY RULES TAKES PRECEDENCE OVER ALLOW RULES.
–force 假设所有操作都是yes,规避提示
–group <String: group> 应该添加或删除acl的使用者组。值*表示acl应该应用于所有组。
–idempotent 为生产者启用幂等性。这应该与—producer选项结合使用。注意,如果生产者被授权给特定的事务id,则自动启用幂等性。
–list 列出指定资源的acl,使用–topic 或–group 或–cluster指定资源。
–operation 允许或拒绝的操作。有效的操作名是:Read Write Create Delete Alter Describe ClusterAction AlterConfigs Describe beconfigs IdempotentWrite All(默认:All)
–producer 方便的选项添加/删除生产者角色的acl。这将生成允许写、主题描述和在集群上创建的acl。
–remove 删除acl。
–topic <String: topic> 应该添加或删除哪些acl的主题。值*表示ACL应该应用于所有主题。
–transactional-id String:transactional-id 应该添加或删除acl的transactionalId。值*表示acl应该应用于所有transactionalid。
–help 帮助文档

11. kafka-streams-application-reset.sh

  • 功能:
    参数说明
    • 参数1
    • 参数2

The Streams Reset Tool allows you to quickly reset an application in order to reprocess its data from scratch.

  • This tool resets offsets of input topics to the earliest available offset and it skips to the end of intermediate topics (topics used in the through() method).
  • This tool deletes the internal topics that were created by Kafka Streams (topics starting with “<application.id>-“).
    You do not need to specify internal topics because the tool finds them automatically.
  • This tool will not delete output topics (if you want to delete them, you need to do it yourself with the bin/kafka-topics.sh command).
  • This tool will not clean up the local state on the stream application instances (the persisted stores used to cache aggregation results).
    You need to call KafkaStreams#cleanUp() in your application or manually delete them from the directory specified by “state.dir” configuration (/tmp/kafka-streams/<application.id> by default).

*** 重要!如果您在运行重置工具后需要清理本地存储,否则会有错误的输出!

参数 含义
–application-id <String: id> The Kafka Streams application ID。(application.id).
–bootstrap-servers <String: urls> broker列表
–config-file <String: file name> 包含要传递给client和consumer的配置的属性文件。
–dry-run 显示将在不执行复位命令的情况下执行的操作。
–input-topics <String: list> 逗号分隔的用户输入topic列表。对于这些主题,工具将把offset重置为最早可用的offset。
–intermediate-topics <String: list> 中间用户主题(在through()方法中使用的主题)的逗号分隔列表。对于这些主题,工具将跳到最后。

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Kafka总结(六)Kafka 安全性保障 Kafka总结(六)Kafka 安全性保障
1. ssl/tls 原理 摘要算法:将需要加密的铭文使用hash函数生成一段被称为摘要的遗传密文,128位 数字签名:将摘要使用私钥加密后连同明文一同发送,接收方使用公钥将其解密为摘要,然后使用hash函数从明文算出摘要,将二者进行对比
2020-02-20
下一篇 
Kafka总结(五)Kafka 命令行详解上、topic、producer、consumer相关命令 Kafka总结(五)Kafka 命令行详解上、topic、producer、consumer相关命令
脚本列表 序号 脚本 功能 1 kafka-server-start.sh 启动kafka服务 2 kafka-server-stop.sh 停止kafka服务 3 kafka-topics.sh topic管理脚本
2020-02-16
  目录