背景
前段时间由于集群磁盘吃紧做了一次对HDFS上文件副本缩减的操作,导致集群的DataNode间数据不均衡,所以需要做一次rebalance。
1. balance参数介绍
hdfs balancer –help
参数 | 说明 |
---|---|
-policy <policy> | 平衡策略:datanode或blockpool,默认为datanode级别的平衡策略 |
-threshold <threshold> | 判断集群是否平衡的目标参数,每个datanode存储使用率和集群总存储使用率的差值都应该小于这个阈值,理论上这个参数设置的越小,整个集群就越平衡,但是在线上环境中,Hadoop集群在进行balance时,还在并发的进行写入和删除数据,所以可能无法达到设置的平衡参数值。参数区间为 0~100。 |
-exclude [-f <hosts-file> | <comma-separated list of hosts>] |
默认为空,指定该部分ip不参与balance, -f:指定输入为文件 |
-include [-f <hosts-file> | <comma-separated list of hosts>] |
默认为空,只允许该部分ip参与balance, -f:指定输入为文件 |
-source [-f <hosts-file> | <comma-separated list of hosts>] |
默认为空,仅选择指定的数据节点作为源节点。 -f:指定输入为文件 |
-blockpools <comma-separated list of blockpool ids> | 平衡器 仅在blockpool模式运行 |
-idleiterations <idleiterations> | 迭代次数,默认为5 ,-1为无限次,到集群平衡为止 |
-runDuringUpgrade | 是否在正在进行的HDFS升级期间运行平衡器。这通常是不希望的,因为它不会影响过度使用的机器上的已用空间。 |
注意:该工具反复的将块从高利用率的数据节点移动到使用低的数据节点。 在每次迭代中,datanode移动或接收的数量不超过 MIN(10G,容量的阈值)。 每次迭代运行不超过20次分钟。 在每次迭代结束时,平衡器获得来自namenode的最新的数据节点的信息。
Block Pool(块池)
所谓Block pool(块池)就是属于单个命名空间的一组block(块)。每一个datanode为所有的block pool存储块。Datanode是一个物理概念,而block pool是一个重新将block划分的逻辑概念。同一个datanode中可以存着属于多个block pool的多个块。Block pool允许一个命名空间在不通知其他命名空间的情况下为一个新的block创建Block ID。同时,一个Namenode失效不会影响其下的datanode为其他Namenode的服务。
当datanode与Namenode建立联系并开始会话后自动建立Block pool。每个block都有一个唯一的标识,这个标识我们称之为扩展的块ID(Extended Block ID)= BlockID+BlockID。这个扩展的块ID在HDFS集群之间都是唯一的,这为以后集群归并创造了条件。
Datanode中的数据结构都通过块池ID(BlockPoolID)索引,即datanode中的BlockMap,storage等都通过BPID索引。
在HDFS中,所有的更新、回滚都是以Namenode和BlockPool为单元发生的。即同一HDFS Federation中不同的Namenode/BlockPool之间没有什么关系。
Hadoop V0.23版本中Block Pool的管理功能依然放在了Namenode中,将来的版本中会将Block Pool的管理功能移动的新的功能节点中。
2. 迁移数据效率优化
1.增加带宽
修改 hdfs-site.xml 中 dfs.balance.bandwidthPerSec 参数
参数含义:设置balance工具在运行中所能占用的带宽,设置的过大可能会造成mapred运行缓慢,默认设置:10M
修改有如下两种方式:
- 在配置文件中修改带宽 (修改需重启hdfs):
<property>
<name>dfs.datanode.balance.bandwidthPerSec</name>
<!--<备注:62500000 / (1024 * 1024) = 60M/s>-->
<value>62500000</value>
</property>
- 动态增大带宽(不需重启)
需要切换到hdfs用户,不可设置太大,会占用mapreduce任务的带宽
hdfs dfsadmin -fs hdfs://node1:8020 -setBalancerBandwidth 62500000
单位是Byte,这个示情况而定,如果交换机性能好点的,完全可以设定为100MB,如果机器的网卡和交换机的带宽有限,可以适当降低该速度。
2.增加最大线程数
修改hdfs-site.xml 中 dfs.datanode.max.transfer.threads = 4096
(如果运行hbase的话建议为16384),指定用于在DataNode间传输block数据的最大线程数,老版本的对应参数为dfs.datanode.max.xcievers
3.增加用于balance待移动block的最大线程数
修改hdfs-site.xml 中 dfs.datanode.balance.max.concurrent.moves = 50
指定DataNode上同时用于balance待移动block的最大线程个数,这个值默认是50
如果配置没生效或者不合理的话,Balancer会有如下警告信息:
16/05/17 11:54:59 WARN balancer.Dispatcher: Failed to move blk_1075360746_1920035 with size=134217728 from xx.xx.xx.48:50010:DISK to xx.xx.xx.xx:50010:DISK through xx.xx.xx.xx:50010: Got error, status message opReplaceBlock BP-647596829-xx.xx.xx.xx-1448614319339:blk_1075360746_1920035 received exception java.io.IOException: Got error, status message Not able to copy block 1075360746 to /xx.xx.xx.xx:39630 because threads quota is exceeded., copy block BP-647596829-xx.xx.xx.xx-1448614319339:blk_1075360746_1920035 from /xx.xx.xx.xx:50010, block move is failed
状态查看
hdfs dfsadmin balancer –daemon status
3. Hadoop Balancer的步骤:
- 从namenode获取datanode磁盘的使用情况
- 计算需要把哪些数据移动到哪些节点
- 分别移动,完成后删除旧的block信息
- 循环执行,直到达到平衡标准
4. 退出条件
balance脚本在满足以下任何一个条件都会自动退出:
- 集群是平衡的;
- 没有块可以移动;
- 没有为指定的连续迭代移动块(默认为5);
- 与namenode通信时发生IOException;
- 另一个平衡器正在运行。
5. 源码解析
5.1 统计需要balance的datanode:
line 1098 @org.apache.hadoop.hdfs.server.balancer.Dispatcher.java
//是否忽略datanode节点
private boolean shouldIgnore(DatanodeInfo dn) {
// ignore decommissioned nodes (忽略已经下架的datanode)
final boolean decommissioned = dn.isDecommissioned();
// ignore decommissioning nodes(忽略正在下架的datanode)
final boolean decommissioning = dn.isDecommissionInProgress();
// ignore nodes in exclude list (忽略参数:-exclude配置的datanode)
final boolean excluded = Util.isExcluded(excludedNodes, dn);
// ignore nodes not in the include list (if include list is not empty)
// (如果参数:-include不为空,忽略不在include列表里的datanode)
final boolean notIncluded = !Util.isIncluded(includedNodes, dn);
// 如果上述条件有一个忽略条件为true则忽略该datanode
if (decommissioned || decommissioning || excluded || notIncluded) {
if (LOG.isTraceEnabled()) {
LOG.trace("Excluding datanode " + dn
+ ": decommissioned=" + decommissioned
+ ", decommissioning=" + decommissioning
+ ", excluded=" + excluded
+ ", notIncluded=" + notIncluded);
}
return true;
}
return false;
}
5.2 计算集群平均使用率
指标 | 说明 |
---|---|
totalUsedSpaces | 各datanode已使用空间(dfsUsed,不包含non dfsUsed)相加的总和; |
totalCapacities | 各datanode总空间(DataNode配置的服务器磁盘目录)相加的总和; |
集群平均使用率 | 各datanode已使用空间相加的总和 *100 / 各datanode总空间相加总和 |
---|---|
average | totalUsedSpaces * 100 / totalCapacities |
line 54 @org.apache.hadoop.hdfs.server.balancer.BalancingPolicy.java
void initAvgUtilization() {
for(StorageType t : StorageType.asList()) {
final long capacity = totalCapacities.get(t);
if (capacity > 0L) {
final double avg = totalUsedSpaces.get(t)*100.0/capacity;
avgUtilizations.set(t, avg);
}
}
}
5.3 计算单个DataNode使用率
指标 | 说明 |
---|---|
dfsUsed | 当前datanode dfs(dfsUsed,不包含non dfsUsed)已使用空间; |
capacity | 当前datanode(DataNode配置的服务器磁盘目录)总空间; |
单个datanode使用率 | = 当前 datanode dfs 已使用空间 *100.0 / 当前 datanode 总空间 |
utilization | = dfsUsed * 100.0 / capacity |
line 71 @org.apache.hadoop.hdfs.server.balancer.BalancingPolicy.java
Double getUtilization(DatanodeStorageReport r, final StorageType t) {
long capacity = 0L;
long dfsUsed = 0L;
for(StorageReport s : r.getStorageReports()) {
if (s.getStorage().getStorageType() == t) {
capacity += s.getCapacity();
dfsUsed += s.getDfsUsed();
}
}
return capacity == 0L? null: dfsUsed*100.0/capacity;
}
指标 | 说明 |
---|---|
单个datanode使用率与集群平均使用率差值 | 单个datanode使用率 – 集群平均使用率 |
utilizationDiff | utilization – average |
单个datanode utilizationDiff与阈值的差值 | | 单个datanode使用率与集群平均使用率差值| – 阈值 |
thresholdDiff | | utilizationDiff| – threshold |
需要迁移或者可以迁入的空间 | | 单个datanode使用率与集群平均使用率差值| *当前datanode总空间 |
maxSizeToMove | | utilizationDiff| * capacity |
可以迁入的空间计算:Math.min(remaining, maxSizeToMove)
需要迁移的空间计算:Math.min(max, maxSizeToMove)
remaining:datanode节点剩余空间
max:默认单个datanode单次balance迭代可以迁移的最大空间限制,缺省10G)
默认迭代次数为5,即运行一次balance脚本,单个datanode可以最大迁移的空间为:5*10G = 50G
差值判断后datanode的保存队列:
指标 | 比较 | 比较说明 | 类型 |
---|---|---|---|
overUtilized | utilizationDiff > 0 && thresholdDiff > 0 | 使用率超过平均值,且差值大于阈值 | 需要迁出数据的节点(source类型) |
underUtilized | utilizationDiff < 0 && thresholdDiff > 0 | 使用率低于平均值,且差值大于阈值 | 需要迁入数据的节点(Target类型) |
aboveAvgUtilized | utilizationDiff > 0 && thresholdDiff <= 0 | 使用率超过平均值,且差值小于等于阈值 | 需要迁出数据的节点(source类型) |
belowAvgUtilized | utilizationDiff < 0 && thresholdDiff <= 0 | 使用率低于平均值,且差值小于等于阈值 | 需要迁入数据的节点(Target类型) |
5.4 数据迁移配对原则与步骤
原则
- 优先为同机架,其次为其它机架;
- 一对多配对
步骤 | 操作 |
---|---|
第一步[Source -> Target] | each overUtilized datanode => one or more underUtilized datanodes |
第二步[Source -> Target] | match each remaining overutilized datanode => one or more belowAvgUtilized datanodes |
第三步[Target -> Source] | each remaining underutilized datanode (step 1未和overUtilized匹配过) => one or more aboveAvgUtilized datanodes |
line 467 @org.apache.hadoop.hdfs.server.balancer.Balancer.java
/** Decide all <source, target> pairs according to the matcher. */
private void chooseStorageGroups(final Matcher matcher) {
/* first step: match each overUtilized datanode (source) to
* one or more underUtilized datanodes (targets).
*/
LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => underUtilized");
chooseStorageGroups(overUtilized, underUtilized, matcher);
/* match each remaining overutilized datanode (source) to
* below average utilized datanodes (targets).
* Note only overutilized datanodes that haven't had that max bytes to move
* satisfied in step 1 are selected
*/
LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => belowAvgUtilized");
chooseStorageGroups(overUtilized, belowAvgUtilized, matcher);
/* match each remaining underutilized datanode (target) to
* above average utilized datanodes (source).
* Note only underutilized datanodes that have not had that max bytes to
* move satisfied in step 1 are selected.
*/
LOG.info("chooseStorageGroups for " + matcher + ": underUtilized => aboveAvgUtilized");
chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher);
}
构建每一对<source, target>时,需要计算当前可以迁移或者迁入的空间大小。dispatcher创建dispatchExecutor线程池执行数据迁移调度。
line 531 @org.apache.hadoop.hdfs.server.balancer.Balancer.java
private void matchSourceWithTargetToMove(Source source, StorageGroup target) {
long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
final Task task = new Task(target, size);
source.addTask(task);
target.incScheduledSize(task.getSize());
dispatcher.add(source, target);
LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
+ source.getDisplayName() + " to " + target.getDisplayName());
}
6. 执行balance
###6.1 预计算迁移数据量
节点名称 | node2 | node3 |
---|---|---|
容量 | 1597G | 1597G |
总容量 | 3194G | 3194G |
使用量 | 292G | 18G |
集群平均使用率 (average) |
9.7057 | 9.7057 |
单个datanode使用率 (utilization) |
18.2843 | 1.1271 |
单个datanode使用率与集群平均使用率差值 ( utilizationDiff) |
8.5786 | -8.5786 |
单个datanode utilizationDiff与阈值的差值(thresholdDiff ) | 3.5786 | 3.5786 |
需要迁移或者可以迁入的空间 (maxSizeToMove ) |
57.1502 | 57.1502 |
utilizationDiff>0 & thresholdDiff >0 |
utilizationDiff<0 & thresholdDiff >0 |
|
迁出数据 | 迁入数据 |
指标 | 计算 |
---|---|
集群平均使用率 | 各datanode已使用空间相加的总和 *100 / 各datanode总空间相加总和 |
average | totalUsedSpaces * 100 / totalCapacities |
9.7057% | (292+18)*100/3194 |
单个datanode使用率 | 当前 datanode dfs 已使用空间 *100.0 / 当前 datanode 总空间 | |
---|---|---|
utilization | dfsUsed * 100.0 / capacity | |
node2 | 18.2843 % | (292*100)/1597 |
node3 | 1.1271% | (18*100)/1597 |
单个datanode使用率与集群平均使用率差值 | 单个datanode使用率 – 集群平均使用率 | |
---|---|---|
utilizationDiff | utilization – average | |
node2 | 8.5786% | 18.2843% – 9.7057% |
node3 | -8.5786% | 1.1271% – 9.7057% |
单个datanode| utilizationDiff与阈值的差值 | |单个datanode使用率与集群平均使用率差值| – 阈值 | |
---|---|---|
thresholdDiff | | utilizationDiff| – threshold | |
node2 | 3.5786% | 8.5786% – 5% |
node3 | 3.5786% |
需要迁移或者可以迁入的空间 | |单个datanode使用率与阈值差值| *当前 datanode 容量 | |
---|---|---|
maxSizeToMove | |thresholdDiff | * 1597 | |
node2 | 57.1502 | 3.5786% * 1597 |
node3 | 57.1502 | 3.5786% * 1597 |
6.2 执行balance
balance 命令
hdfs balancer -fs hdfs://node1:8020 -threshold 5
打印日志
2019-7-10 14:56:18 0 4.68 GB 57.20 GB 10 GB
19/07/10 14:56:27 INFO balancer.Balancer: dfs.balancer.movedWinWidth = 5400000 (default=5400000)
19/07/10 14:56:27 INFO balancer.Balancer: dfs.balancer.moverThreads = 1000 (default=1000)
19/07/10 14:56:27 INFO balancer.Balancer: dfs.balancer.dispatcherThreads = 200 (default=200)
19/07/10 14:56:27 INFO balancer.Balancer: dfs.datanode.balance.max.concurrent.moves = 50 (default=50)
19/07/10 14:56:27 INFO balancer.Balancer: dfs.balancer.getBlocks.size = 2147483648 (default=2147483648)
19/07/10 14:56:27 INFO balancer.Balancer: dfs.balancer.getBlocks.min-block-size = 10485760 (default=10485760)
19/07/10 14:56:27 INFO balancer.Balancer: dfs.balancer.max-size-to-move = 10737418240 (default=10737418240)
19/07/10 14:56:27 INFO balancer.Balancer: dfs.blocksize = 134217728 (default=134217728)
结束balance之后计算迁移了多少数据量 77.21-18.49=58.72G 与我们计算的结果大约相差1G左右
7. 结语
对于一些大型的HDFS集群(随时可能扩容或下架服务器),balance脚本需要作为后台常驻进程;
根据官方建议,脚本需要部署在相对空闲的服务器上;
停止脚本通过kill进程实现(建议不kill,后台运行完会自动停止,多次执行同时也只会有一个线程存在,其它自动失败);
针对datanode存储维护,可以针对以下几个方向进行优化:
- 通过参数(idleiterations)增加迭代次数,以增加datanode允许迁移的数据;
- 通过参数(exclude, include)设计合理的允许进行balance策略的服务器,比如将使用率最低(20%)和最高(20%)的进行balance策略;
- 通过参数(threshold )设计合理的阈值;