- 脚本列表
| 序号 | 脚本 | 功能 |
|---|---|---|
| 1 | connect-distributed.sh && connect-standalone.sh | 连接kafka集群/单机模式 |
| 2 | kafka-run-class.sh | 执行kafka.tools的部分功能 |
| 3 | kafka-reassign-partitions.sh | 分区重分配脚本 |
| 4 | kafka-configs.sh | 配置管理脚本 |
| 5 | kafka-delete-records.sh | 删除低水位的数据 |
| 6 | kafka-log-dirs.sh | kafka消息日志目录信息 |
| 7 | kafka-preferred-replica-election.sh | 触发preferred replica选举 |
| 8 | kafka-replica-verification.sh | 复制进度验证脚本 |
| 9 | kafka-mirror-maker.sh | 不同数据中心kafka集群复制工具 |
| 10 | kafka-acls.sh | 权限管理 |
| 11 | kafka-streams-application-reset.sh | kafka stream 命令 |
1. connect-distributed.sh && connect-standalone.sh
Kafka Connect是在0.9以后加入的功能,主要是用来将其他系统的数据导入到Kafka,然后再将Kafka中的数据导出到另外的系统。可以用来做实时数据同步的ETL,数据实时分析处理等。
主要有2种模式:Standalone(单机模式)和Distribute(分布式模式)。
单机主要用来开发,测试,分布式的用于生产环境。
用法比较复杂,建议参考:https://www.orchome.com/344
2. kafka-run-class.sh
- 功能:USAGE: ./kafka-run-class.sh [-daemon] [-name servicename] [-loggc] classname [opts]
参数说明
| 参数 | 含义 |
|---|---|
| –group |
消费组 |
| –output-file |
输出文件 |
| –zkconnect |
zookeeper连接字符串 |
| –help | 打印帮助信息 |
1 |
|
3. kafka-reassign-partitions.sh
- 功能:此命令在副本之间移动主题分区。
参数说明
| 参数 | 含义 |
|---|---|
| –zookeeper <String: urls> | Zookeeper连接的连接字符串,格式为host:port,多个用逗号隔开 |
| –reassignment-json-file <String: manual assignment json file path> | 要使用的格式是- {“partition “: [{“topic”: “foo”, “partition”: 1, “replicas”: [1,2,3], “log_dirs”: [“dir1”,”dir2”,”dir3”]}], “version”:1}注意,” log_dirs”是可选的。当它被指定时,它的长度必须等于副本列表的长度。此列表中的值可以是代理上的日志目录的”any”或赦免路径。如果指定了绝对日志目录路径,则当前需要的是尚未在该代理上创建副本。随后将在代理上的指定日志目录中创建副本。 |
| –verify | 验证重新分配是否按照—resignment - json-file选项指定的方式完成。如果存在为指定的副本占用的节流阀,并且重新平衡已经完成,则节流阀将被移除 |
| –execute | 按照–sign -json-file选项指定的方式开始重新分配 |
| –bootstrap-server <String: Server(s) to use for bootstrapping> | 用于引导的服务器。 为重新分配json文件中的任何副本指定了日志导向器的绝对路径所需 |
| –broker-list <String: brokerlist> | 需要以”0,1,2”的形式重新分配分区的broker列表。如果使用-topic-to-move-json-file生成重新分配配置,则需要这样做 |
| –generate | 生成一个候选分区重新分配配置。注意,这只生成一个候选赋值,而不执行它。 |
| –throttle <Long: throttle> | 分区的移动将被调整到这个值(字节/秒)。重新运行此选项,而一个平衡正在进行中,将改变油门值。节流速率至少为1kb /s。(默认值:1) |
| –timeout <Long: timeout> | 允许等待成功启动分区重分配执行的最长时间(默认值:10000) |
| –topics-to-move-json-file <String: topics to reassign json file path> | 生成一个重新分配配置,将指定主题的分区移动到由–broker- list选项指定的代理列表中。使用的格式是- {“topics”: [{“topic”: “foo”},{“topic”: “foo1”}], “version”:1} |
| –disable-rack-aware | 禁用支持机架的复制分配 |
3.1 移动副本位置
1 | # 查看副本状态 |
3.2 扩展副本
把 partitin 0 在broker 1 增加一个副本,把 partitin 1 在broker 1 增加一个副本
1 | #首先编辑扩展副本文件 |
4. kafka-configs.sh
- 功能:添加/删除topic、client、user或broker的实体配置
参数说明
| 参数 | 含义 |
|---|---|
| –zookeeper <String: urls> | Zookeeper连接的连接字符串,格式为host:port,多个用逗号隔开 |
| –describe | 列出给定实体的配置。 |
| –add-config |
要添加的配置的键值对。方括号可用于将包含逗号的值分组:”k1=v1,k2=[v1,v2,v2],k3=v3”。下面是有效配置的列表:对于entity_type |
| –alter | 更改实体的配置。 |
| –delete-config |
从配置中删除配置 key |
| –entity-default | 客户端/用户的默认实体名称 (applies to corresponding entity type in command line) |
| –entity-name |
实体的名称 (topic name/client id/user principal name/broker id) |
| –entity-type |
实体的类型 (topics/clients/users/brokers) |
| –force | 禁止控制台提示 |
| –help | 帮助文档 |
4.1 查看配置
1 | # 查看topic 配置 |
4.2 添加配置
1 |
|
4.3 删除配置
1 | # 删除配置 |
5. kafka-delete-records.sh
- 功能:
参数说
| 参数 | 含义 |
|---|---|
| –bootstrap-server <String: server(s)> | 要连接的broker列表 |
| –offset-json-file <String: Offset json file path> | 每个分区具有偏移量的JSON文件。使用的格式是:{“partition “: [{“topic”: “foo”, “partition”: 1, “offset”: 1}], “version”:1} |
| –command-config <String: command config property file path> | 要传递给Admin Client的配置的属性文件。 |
5.1 删除数据
1 | # 首先编辑json文件 |
5.2 删除offset 大于现有数据
1 | # 将配置文件编辑为: |
5.3 删除offset 等于现有数据最大offset
1 | # 将配置文件编辑为: |
6. kafka-log-dirs.sh
- 功能:
参数说明
| 参数 | 含义 |
|---|---|
| –bootstrap-server <String: The server (s) to use for bootstrapping> | 要连接的broker列表 |
| –broker-list <String: Broker list> | 要查询的broker列表,格式为”0,1,2”。 如果未指定列表,将查询集群中的所有broker |
| –describe | 描述指定broker上的指定日志目录。 |
| –topic-list <String: Topic list> | 要查询的主题列表,形式为”topic1,topic2,topic3”。 如果未指定主题列表,将查询所有主题 |
1 |
|
7. kafka-preferred-replica-election.sh
- 功能:手动partition leader 均衡
参数说明
kafka提供了一个参数auto.leader.rebalance.enable自动做这件事情,且默认为true,原理是一个后台线程检查并触发leader balance。但是并不建议把这个参数设置为true。因为担心这个自动选举发生在业务高峰期,从而导致影响业务。
| 参数 | 含义 |
|---|---|
| –zookeeper <String: urls> | Zookeeper连接的连接字符串,格式为host:port,多个用逗号隔开 |
| –path-to-json-file <String: list of partitions for which preferred replica leader election needs to be triggered> | JSON文件中包含了一个分区列表,优先选择的是哪个分区,格式如下:{“partitions”: [{“topic”: “foo”, “partition”: 1}, {“topic”: “foobar”, “partition”: 2}]}默认为所有现有分区 |
1 | vim leader.json |
8. kafka-replica-verification.sh
- 功能:验证topic的所有副本的同步数据情况
参数说明
| 参数 | 含义 |
|---|---|
| –broker-list String:servers | 要连接的broker |
| –fetch-size <Integer: bytes> | 每个请求的获取大小(default: 1048576) |
| –max-wait-ms <Integer: ms> | 每个fetch请求的最大等待时间。(default: 1000) |
| –report-interval-ms <Long: ms> | 报告时间间隔。(default: 30000) |
| –time <Long: timestamp/-1(latest)/-2 (earliest)> | 获取初始偏移量的时间戳。(default: -1) |
| –topic-white-list <String: Java regex (String)> | 验证副本一致性的主题白列表。默认为所有主题。 (default: .*) |
1 |
|
9. kafka-mirror-maker.sh
- 功能:集群镜像工具,实现kafka集群间的数据同步,mirror maker进程可以同时开启多个,提高吞吐量。如果其中一个进程挂了,其他线程还能接管其同步任务
参数说明
| 参数 | 含义 |
|---|---|
| –abort.on.send.failure <String: Stop the entire mirror maker when a send failure occurs> | 将镜像生成器配置为在发送失败时退出。(default: true) |
| –blacklist <String: Java regex (String)> | 要镜像的主题的黑名单。只有老客户支持黑名单。 |
| –consumer.config <String: config file> | 用于从源集群消费的Consumer配置。 |
| –consumer.rebalance.listener <String: A custom rebalance listener of type ConsumerRebalanceListener> | 消费者再平衡监听器用于镜像制造商消费者。 |
| –message.handler <String: A custom message handler of type MirrorMakerMessageHandler> | 消息处理程序,它将处理消费者和生产者之间的每条记录。 |
| –message.handler.args <String: Arguments passed to message handler constructor.> | 自定义消息处理程序为镜像生成器使用的参数 |
| –new.consumer | 在镜像生成器中使用新用户(这是默认设置)。 |
| –num.streams <Integer: Number of threads> | 消费流的数量,默认1 |
| –offset.commit.interval.ms <Integer:offset commit interval in millisecond> | 提交offset的时间间隔,单位毫秒default: 60000) |
| –producer.config <String: config file> | Embedded producer config. |
| –rebalance.listener.args <String: Arguments passed to custom rebalance listener constructor as a string.> | 自定义重新平衡侦听器用于镜像生成器使用者的参数。 |
| –whitelist <String: Java regex (String)> | 镜像topic的白名单。 |
| –help | 打印帮助信息 |
1 | ./kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer.properties --num.streams 2 --producer.config producer.properties --whitelist="test*" |
consumer.config 配置示例
1 | # 指定消费者 |
producer.config 配置示例
1 | # 源集群 |
10. kafka-acls.sh
- 功能:
参数说明
| 参数 | 含义 |
|---|---|
| –authorizer-properties <String: authorizer-properties> | 配置Authorizer实例所需的属性。 KV对. 默认示例:connect=localhost:2181 |
| –add | 添加ACL。 |
| –allow-host <String: allow-host> | –allow-principal中列出的主体可以访问的主机。如果您指定了–allow-principal,则此选项的默认值将设置为*,允许从所有主机进行访问。 |
| –allow-principal <String: allow-principal> | principal is in principalType:name format. Note that principalType must be supported by the Authorizer being used. For example, User:* is the wild card indicating all users. |
| –authorizer <String: authorizer> | 授权方的完全限定类名,(默认:kafka.security.auth.SimpleAclAuthorizer) |
| –cluster | 添加/删除集群acl。 |
| –consumer | 为使用者角色添加/删除acl的方便选项。这将生成允许读、主题描述和组读的acl。 |
| –deny-host <String: deny-host> | 拒绝访问–deny-principal中列出的主体所在的主机。如果您指定了–deny–principal,那么这个选项的默认值将设置为*,它拒绝所有主机的访问。 |
| –deny-principal <String: deny- principal> | principal is in principalType:name format. By default anyone not added through –allow-principal is denied access. You only need to use this option as negation to already allowed set. Note that principalType must be supported by the Authorizer being used. For example if you wanted to allow access to all users in the system but not test-user you can define an ACL that allows access to User:* and specify –deny- principal=User:test@EXAMPLE.COM. AND PLEASE REMEMBER DENY RULES TAKES PRECEDENCE OVER ALLOW RULES. |
| –force | 假设所有操作都是yes,规避提示 |
| –group <String: group> | 应该添加或删除acl的使用者组。值*表示acl应该应用于所有组。 |
| –idempotent | 为生产者启用幂等性。这应该与—producer选项结合使用。注意,如果生产者被授权给特定的事务id,则自动启用幂等性。 |
| –list | 列出指定资源的acl,使用–topic 或–group 或–cluster指定资源。 |
| –operation |
允许或拒绝的操作。有效的操作名是:Read Write Create Delete Alter Describe ClusterAction AlterConfigs Describe beconfigs IdempotentWrite All(默认:All) |
| –producer | 方便的选项添加/删除生产者角色的acl。这将生成允许写、主题描述和在集群上创建的acl。 |
| –remove | 删除acl。 |
| –topic <String: topic> | 应该添加或删除哪些acl的主题。值*表示ACL应该应用于所有主题。 |
| –transactional-id String:transactional-id | 应该添加或删除acl的transactionalId。值*表示acl应该应用于所有transactionalid。 |
| –help | 帮助文档 |
1 |
11. kafka-streams-application-reset.sh
- 功能:
参数说明- 参数1
- 参数2
The Streams Reset Tool allows you to quickly reset an application in order to reprocess its data from scratch.
- This tool resets offsets of input topics to the earliest available offset and it skips to the end of intermediate topics (topics used in the through() method).
- This tool deletes the internal topics that were created by Kafka Streams (topics starting with “<application.id>-“).
You do not need to specify internal topics because the tool finds them automatically. - This tool will not delete output topics (if you want to delete them, you need to do it yourself with the bin/kafka-topics.sh command).
- This tool will not clean up the local state on the stream application instances (the persisted stores used to cache aggregation results).
You need to call KafkaStreams#cleanUp() in your application or manually delete them from the directory specified by “state.dir” configuration (/tmp/kafka-streams/<application.id> by default).
*** 重要!如果您在运行重置工具后需要清理本地存储,否则会有错误的输出!
| 参数 | 含义 |
|---|---|
| –application-id <String: id> | The Kafka Streams application ID。(application.id). |
| –bootstrap-servers <String: urls> | broker列表 |
| –config-file <String: file name> | 包含要传递给client和consumer的配置的属性文件。 |
| –dry-run | 显示将在不执行复位命令的情况下执行的操作。 |
| –input-topics <String: list> | 逗号分隔的用户输入topic列表。对于这些主题,工具将把offset重置为最早可用的offset。 |
| –intermediate-topics <String: list> | 中间用户主题(在through()方法中使用的主题)的逗号分隔列表。对于这些主题,工具将跳到最后。 |
1 |