背景
前段时间由于集群磁盘吃紧做了一次对HDFS上文件副本缩减的操作,导致集群的DataNode间数据不均衡,所以需要做一次rebalance。
1. balance参数介绍
1 | hdfs balancer –help |
| 参数 | 说明 |
|---|---|
| -policy <policy> | 平衡策略:datanode或blockpool,默认为datanode级别的平衡策略 |
| -threshold <threshold> | 判断集群是否平衡的目标参数,每个datanode存储使用率和集群总存储使用率的差值都应该小于这个阈值,理论上这个参数设置的越小,整个集群就越平衡,但是在线上环境中,Hadoop集群在进行balance时,还在并发的进行写入和删除数据,所以可能无法达到设置的平衡参数值。参数区间为 0~100。 |
| -exclude [-f <hosts-file> | <comma-separated list of hosts>] |
默认为空,指定该部分ip不参与balance, -f:指定输入为文件 |
| -include [-f <hosts-file> | <comma-separated list of hosts>] |
默认为空,只允许该部分ip参与balance, -f:指定输入为文件 |
| -source [-f <hosts-file> | <comma-separated list of hosts>] |
默认为空,仅选择指定的数据节点作为源节点。 -f:指定输入为文件 |
| -blockpools <comma-separated list of blockpool ids> | 平衡器 仅在blockpool模式运行 |
| -idleiterations <idleiterations> | 迭代次数,默认为5 ,-1为无限次,到集群平衡为止 |
| -runDuringUpgrade | 是否在正在进行的HDFS升级期间运行平衡器。这通常是不希望的,因为它不会影响过度使用的机器上的已用空间。 |
注意:该工具反复的将块从高利用率的数据节点移动到使用低的数据节点。 在每次迭代中,datanode移动或接收的数量不超过 MIN(10G,容量的阈值)。 每次迭代运行不超过20次分钟。 在每次迭代结束时,平衡器获得来自namenode的最新的数据节点的信息。
Block Pool(块池)
所谓Block pool(块池)就是属于单个命名空间的一组block(块)。每一个datanode为所有的block pool存储块。Datanode是一个物理概念,而block pool是一个重新将block划分的逻辑概念。同一个datanode中可以存着属于多个block pool的多个块。Block pool允许一个命名空间在不通知其他命名空间的情况下为一个新的block创建Block ID。同时,一个Namenode失效不会影响其下的datanode为其他Namenode的服务。
当datanode与Namenode建立联系并开始会话后自动建立Block pool。每个block都有一个唯一的标识,这个标识我们称之为扩展的块ID(Extended Block ID)= BlockID+BlockID。这个扩展的块ID在HDFS集群之间都是唯一的,这为以后集群归并创造了条件。
Datanode中的数据结构都通过块池ID(BlockPoolID)索引,即datanode中的BlockMap,storage等都通过BPID索引。
在HDFS中,所有的更新、回滚都是以Namenode和BlockPool为单元发生的。即同一HDFS Federation中不同的Namenode/BlockPool之间没有什么关系。
Hadoop V0.23版本中Block Pool的管理功能依然放在了Namenode中,将来的版本中会将Block Pool的管理功能移动的新的功能节点中。
2. 迁移数据效率优化
1.增加带宽
修改 hdfs-site.xml 中 dfs.balance.bandwidthPerSec 参数
参数含义:设置balance工具在运行中所能占用的带宽,设置的过大可能会造成mapred运行缓慢,默认设置:10M
修改有如下两种方式:
- 在配置文件中修改带宽 (修改需重启hdfs):
1 | <property> |
- 动态增大带宽(不需重启)
需要切换到hdfs用户,不可设置太大,会占用mapreduce任务的带宽
1 | hdfs dfsadmin -fs hdfs://node1:8020 -setBalancerBandwidth 62500000 |
单位是Byte,这个示情况而定,如果交换机性能好点的,完全可以设定为100MB,如果机器的网卡和交换机的带宽有限,可以适当降低该速度。
2.增加最大线程数
修改hdfs-site.xml 中 dfs.datanode.max.transfer.threads = 4096
(如果运行hbase的话建议为16384),指定用于在DataNode间传输block数据的最大线程数,老版本的对应参数为dfs.datanode.max.xcievers
3.增加用于balance待移动block的最大线程数
修改hdfs-site.xml 中 dfs.datanode.balance.max.concurrent.moves = 50
指定DataNode上同时用于balance待移动block的最大线程个数,这个值默认是50
如果配置没生效或者不合理的话,Balancer会有如下警告信息:
16/05/17 11:54:59 WARN balancer.Dispatcher: Failed to move blk_1075360746_1920035 with size=134217728 from xx.xx.xx.48:50010:DISK to xx.xx.xx.xx:50010:DISK through xx.xx.xx.xx:50010: Got error, status message opReplaceBlock BP-647596829-xx.xx.xx.xx-1448614319339:blk_1075360746_1920035 received exception java.io.IOException: Got error, status message Not able to copy block 1075360746 to /xx.xx.xx.xx:39630 because threads quota is exceeded., copy block BP-647596829-xx.xx.xx.xx-1448614319339:blk_1075360746_1920035 from /xx.xx.xx.xx:50010, block move is failed
状态查看
1 | hdfs dfsadmin balancer –daemon status |
3. Hadoop Balancer的步骤:
- 从namenode获取datanode磁盘的使用情况
- 计算需要把哪些数据移动到哪些节点
- 分别移动,完成后删除旧的block信息
- 循环执行,直到达到平衡标准
4. 退出条件
balance脚本在满足以下任何一个条件都会自动退出:
- 集群是平衡的;
- 没有块可以移动;
- 没有为指定的连续迭代移动块(默认为5);
- 与namenode通信时发生IOException;
- 另一个平衡器正在运行。
5. 源码解析
5.1 统计需要balance的datanode:
1 |
|
5.2 计算集群平均使用率
| 指标 | 说明 |
|---|---|
| totalUsedSpaces | 各datanode已使用空间(dfsUsed,不包含non dfsUsed)相加的总和; |
| totalCapacities | 各datanode总空间(DataNode配置的服务器磁盘目录)相加的总和; |
| 集群平均使用率 | 各datanode已使用空间相加的总和 *100 / 各datanode总空间相加总和 |
|---|---|
| average | totalUsedSpaces * 100 / totalCapacities |
1 | line 54 .apache.hadoop.hdfs.server.balancer.BalancingPolicy.java |
5.3 计算单个DataNode使用率
| 指标 | 说明 |
|---|---|
| dfsUsed | 当前datanode dfs(dfsUsed,不包含non dfsUsed)已使用空间; |
| capacity | 当前datanode(DataNode配置的服务器磁盘目录)总空间; |
| 单个datanode使用率 | = 当前 datanode dfs 已使用空间 *100.0 / 当前 datanode 总空间 |
| utilization | = dfsUsed * 100.0 / capacity |
1 | line 71 .apache.hadoop.hdfs.server.balancer.BalancingPolicy.java |
| 指标 | 说明 |
|---|---|
| 单个datanode使用率与集群平均使用率差值 | 单个datanode使用率 – 集群平均使用率 |
| utilizationDiff | utilization – average |
| 单个datanode utilizationDiff与阈值的差值 | | 单个datanode使用率与集群平均使用率差值| – 阈值 |
| thresholdDiff | | utilizationDiff| – threshold |
| 需要迁移或者可以迁入的空间 | | 单个datanode使用率与集群平均使用率差值| *当前datanode总空间 |
| maxSizeToMove | | utilizationDiff| * capacity |
可以迁入的空间计算:Math.min(remaining, maxSizeToMove)
需要迁移的空间计算:Math.min(max, maxSizeToMove)
remaining:datanode节点剩余空间
max:默认单个datanode单次balance迭代可以迁移的最大空间限制,缺省10G)
默认迭代次数为5,即运行一次balance脚本,单个datanode可以最大迁移的空间为:5*10G = 50G
差值判断后datanode的保存队列:
| 指标 | 比较 | 比较说明 | 类型 |
|---|---|---|---|
| overUtilized | utilizationDiff > 0 && thresholdDiff > 0 | 使用率超过平均值,且差值大于阈值 | 需要迁出数据的节点(source类型) |
| underUtilized | utilizationDiff < 0 && thresholdDiff > 0 | 使用率低于平均值,且差值大于阈值 | 需要迁入数据的节点(Target类型) |
| aboveAvgUtilized | utilizationDiff > 0 && thresholdDiff <= 0 | 使用率超过平均值,且差值小于等于阈值 | 需要迁出数据的节点(source类型) |
| belowAvgUtilized | utilizationDiff < 0 && thresholdDiff <= 0 | 使用率低于平均值,且差值小于等于阈值 | 需要迁入数据的节点(Target类型) |
5.4 数据迁移配对原则与步骤
原则
- 优先为同机架,其次为其它机架;
- 一对多配对
| 步骤 | 操作 |
|---|---|
| 第一步[Source -> Target] | each overUtilized datanode => one or more underUtilized datanodes |
| 第二步[Source -> Target] | match each remaining overutilized datanode => one or more belowAvgUtilized datanodes |
| 第三步[Target -> Source] | each remaining underutilized datanode (step 1未和overUtilized匹配过) => one or more aboveAvgUtilized datanodes |
1 |
|
构建每一对<source, target>时,需要计算当前可以迁移或者迁入的空间大小。dispatcher创建dispatchExecutor线程池执行数据迁移调度。
1 |
|
6. 执行balance
###6.1 预计算迁移数据量
| 节点名称 | node2 | node3 |
|---|---|---|
| 容量 | 1597G | 1597G |
| 总容量 | 3194G | 3194G |
| 使用量 | 292G | 18G |
| 集群平均使用率 (average) |
9.7057 | 9.7057 |
| 单个datanode使用率 (utilization) |
18.2843 | 1.1271 |
| 单个datanode使用率与集群平均使用率差值 ( utilizationDiff) |
8.5786 | -8.5786 |
| 单个datanode utilizationDiff与阈值的差值(thresholdDiff ) | 3.5786 | 3.5786 |
| 需要迁移或者可以迁入的空间 (maxSizeToMove ) |
57.1502 | 57.1502 |
| utilizationDiff>0 & thresholdDiff >0 |
utilizationDiff<0 & thresholdDiff >0 |
|
| 迁出数据 | 迁入数据 |
| 指标 | 计算 |
|---|---|
| 集群平均使用率 | 各datanode已使用空间相加的总和 *100 / 各datanode总空间相加总和 |
| average | totalUsedSpaces * 100 / totalCapacities |
| 9.7057% | (292+18)*100/3194 |
| 单个datanode使用率 | 当前 datanode dfs 已使用空间 *100.0 / 当前 datanode 总空间 | |
|---|---|---|
| utilization | dfsUsed * 100.0 / capacity | |
| node2 | 18.2843 % | (292*100)/1597 |
| node3 | 1.1271% | (18*100)/1597 |
| 单个datanode使用率与集群平均使用率差值 | 单个datanode使用率 – 集群平均使用率 | |
|---|---|---|
| utilizationDiff | utilization – average | |
| node2 | 8.5786% | 18.2843% – 9.7057% |
| node3 | -8.5786% | 1.1271% – 9.7057% |
| 单个datanode| utilizationDiff与阈值的差值 | |单个datanode使用率与集群平均使用率差值| – 阈值 | |
|---|---|---|
| thresholdDiff | | utilizationDiff| – threshold | |
| node2 | 3.5786% | 8.5786% – 5% |
| node3 | 3.5786% |
| 需要迁移或者可以迁入的空间 | |单个datanode使用率与阈值差值| *当前 datanode 容量 | |
|---|---|---|
| maxSizeToMove | |thresholdDiff | * 1597 | |
| node2 | 57.1502 | 3.5786% * 1597 |
| node3 | 57.1502 | 3.5786% * 1597 |
6.2 执行balance
balance 命令
1 | hdfs balancer -fs hdfs://node1:8020 -threshold 5 |
打印日志
2019-7-10 14:56:18 0 4.68 GB 57.20 GB 10 GB
19/07/10 14:56:27 INFO balancer.Balancer: dfs.balancer.movedWinWidth = 5400000 (default=5400000)
19/07/10 14:56:27 INFO balancer.Balancer: dfs.balancer.moverThreads = 1000 (default=1000)
19/07/10 14:56:27 INFO balancer.Balancer: dfs.balancer.dispatcherThreads = 200 (default=200)
19/07/10 14:56:27 INFO balancer.Balancer: dfs.datanode.balance.max.concurrent.moves = 50 (default=50)
19/07/10 14:56:27 INFO balancer.Balancer: dfs.balancer.getBlocks.size = 2147483648 (default=2147483648)
19/07/10 14:56:27 INFO balancer.Balancer: dfs.balancer.getBlocks.min-block-size = 10485760 (default=10485760)
19/07/10 14:56:27 INFO balancer.Balancer: dfs.balancer.max-size-to-move = 10737418240 (default=10737418240)
19/07/10 14:56:27 INFO balancer.Balancer: dfs.blocksize = 134217728 (default=134217728)
结束balance之后计算迁移了多少数据量 77.21-18.49=58.72G 与我们计算的结果大约相差1G左右
7. 结语
对于一些大型的HDFS集群(随时可能扩容或下架服务器),balance脚本需要作为后台常驻进程;
根据官方建议,脚本需要部署在相对空闲的服务器上;
停止脚本通过kill进程实现(建议不kill,后台运行完会自动停止,多次执行同时也只会有一个线程存在,其它自动失败);
针对datanode存储维护,可以针对以下几个方向进行优化:
- 通过参数(idleiterations)增加迭代次数,以增加datanode允许迁移的数据;
- 通过参数(exclude, include)设计合理的允许进行balance策略的服务器,比如将使用率最低(20%)和最高(20%)的进行balance策略;
- 通过参数(threshold )设计合理的阈值;