一次HDFS balance的记录


背景

前段时间由于集群磁盘吃紧做了一次对HDFS上文件副本缩减的操作,导致集群的DataNode间数据不均衡,所以需要做一次rebalance。

1. balance参数介绍

hdfs balancer –help
参数 说明
-policy <policy> 平衡策略:datanode或blockpool,默认为datanode级别的平衡策略
-threshold <threshold> 判断集群是否平衡的目标参数,每个datanode存储使用率和集群总存储使用率的差值都应该小于这个阈值,理论上这个参数设置的越小,整个集群就越平衡,但是在线上环境中,Hadoop集群在进行balance时,还在并发的进行写入和删除数据,所以可能无法达到设置的平衡参数值。参数区间为 0~100。
-exclude
[-f <hosts-file> | <comma-separated list of hosts>]
默认为空,指定该部分ip不参与balance,
-f:指定输入为文件
-include
[-f <hosts-file> | <comma-separated list of hosts>]
默认为空,只允许该部分ip参与balance,
-f:指定输入为文件
-source
[-f <hosts-file> | <comma-separated list of hosts>]
默认为空,仅选择指定的数据节点作为源节点。
-f:指定输入为文件
-blockpools <comma-separated list of blockpool ids> 平衡器 仅在blockpool模式运行
-idleiterations <idleiterations> 迭代次数,默认为5 ,-1为无限次,到集群平衡为止
-runDuringUpgrade 是否在正在进行的HDFS升级期间运行平衡器。这通常是不希望的,因为它不会影响过度使用的机器上的已用空间。

注意:该工具反复的将块从高利用率的数据节点移动到使用低的数据节点。 在每次迭代中,datanode移动或接收的数量不超过 MIN(10G,容量的阈值)。 每次迭代运行不超过20次分钟。 在每次迭代结束时,平衡器获得来自namenode的最新的数据节点的信息。

Block Pool(块池)
所谓Block pool(块池)就是属于单个命名空间的一组block(块)。每一个datanode为所有的block pool存储块。Datanode是一个物理概念,而block pool是一个重新将block划分的逻辑概念。同一个datanode中可以存着属于多个block pool的多个块。Block pool允许一个命名空间在不通知其他命名空间的情况下为一个新的block创建Block ID。同时,一个Namenode失效不会影响其下的datanode为其他Namenode的服务。
当datanode与Namenode建立联系并开始会话后自动建立Block pool。每个block都有一个唯一的标识,这个标识我们称之为扩展的块ID(Extended Block ID)= BlockID+BlockID。这个扩展的块ID在HDFS集群之间都是唯一的,这为以后集群归并创造了条件。
Datanode中的数据结构都通过块池ID(BlockPoolID)索引,即datanode中的BlockMap,storage等都通过BPID索引。
在HDFS中,所有的更新、回滚都是以Namenode和BlockPool为单元发生的。即同一HDFS Federation中不同的Namenode/BlockPool之间没有什么关系。
Hadoop V0.23版本中Block Pool的管理功能依然放在了Namenode中,将来的版本中会将Block Pool的管理功能移动的新的功能节点中。

2. 迁移数据效率优化

1.增加带宽

修改 hdfs-site.xml 中 dfs.balance.bandwidthPerSec 参数
参数含义:设置balance工具在运行中所能占用的带宽,设置的过大可能会造成mapred运行缓慢,默认设置:10M
修改有如下两种方式:

  1. 在配置文件中修改带宽 (修改需重启hdfs):
<property>
  <name>dfs.datanode.balance.bandwidthPerSec</name>
  <!--<备注:62500000 / (1024 * 1024) = 60M/s>-->
  <value>62500000</value>
</property>
  1. 动态增大带宽(不需重启)

需要切换到hdfs用户,不可设置太大,会占用mapreduce任务的带宽

hdfs dfsadmin -fs hdfs://node1:8020 -setBalancerBandwidth 62500000

单位是Byte,这个示情况而定,如果交换机性能好点的,完全可以设定为100MB,如果机器的网卡和交换机的带宽有限,可以适当降低该速度。

2.增加最大线程数

修改hdfs-site.xml 中 dfs.datanode.max.transfer.threads = 4096
(如果运行hbase的话建议为16384),指定用于在DataNode间传输block数据的最大线程数,老版本的对应参数为dfs.datanode.max.xcievers

3.增加用于balance待移动block的最大线程数

修改hdfs-site.xml 中 dfs.datanode.balance.max.concurrent.moves = 50

指定DataNode上同时用于balance待移动block的最大线程个数,这个值默认是50

如果配置没生效或者不合理的话,Balancer会有如下警告信息:
16/05/17 11:54:59 WARN balancer.Dispatcher: Failed to move blk_1075360746_1920035 with size=134217728 from xx.xx.xx.48:50010:DISK to xx.xx.xx.xx:50010:DISK through xx.xx.xx.xx:50010: Got error, status message opReplaceBlock BP-647596829-xx.xx.xx.xx-1448614319339:blk_1075360746_1920035 received exception java.io.IOException: Got error, status message Not able to copy block 1075360746 to /xx.xx.xx.xx:39630 because threads quota is exceeded., copy block BP-647596829-xx.xx.xx.xx-1448614319339:blk_1075360746_1920035 from /xx.xx.xx.xx:50010, block move is failed

状态查看

hdfs dfsadmin balancer –daemon status

3. Hadoop Balancer的步骤:

  1. 从namenode获取datanode磁盘的使用情况
  2. 计算需要把哪些数据移动到哪些节点
  3. 分别移动,完成后删除旧的block信息
  4. 循环执行,直到达到平衡标准

4. 退出条件

balance脚本在满足以下任何一个条件都会自动退出:

  1. 集群是平衡的;
  2. 没有块可以移动;
  3. 没有为指定的连续迭代移动块(默认为5);
  4. 与namenode通信时发生IOException;
  5. 另一个平衡器正在运行。

5. 源码解析

源码路径:https://github.com/apache/hadoop/blob/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer

5.1 统计需要balance的datanode:


line 1098 @org.apache.hadoop.hdfs.server.balancer.Dispatcher.java

//是否忽略datanode节点
private boolean shouldIgnore(DatanodeInfo dn) {
    // ignore decommissioned nodes (忽略已经下架的datanode)
    final boolean decommissioned = dn.isDecommissioned();
    // ignore decommissioning nodes(忽略正在下架的datanode)
    final boolean decommissioning = dn.isDecommissionInProgress();
    // ignore nodes in exclude list (忽略参数:-exclude配置的datanode)
    final boolean excluded = Util.isExcluded(excludedNodes, dn);
    // ignore nodes not in the include list (if include list is not empty)
    // (如果参数:-include不为空,忽略不在include列表里的datanode)
    final boolean notIncluded = !Util.isIncluded(includedNodes, dn);

   // 如果上述条件有一个忽略条件为true则忽略该datanode
    if (decommissioned || decommissioning || excluded || notIncluded) {
      if (LOG.isTraceEnabled()) {
        LOG.trace("Excluding datanode " + dn
            + ": decommissioned=" + decommissioned
            + ", decommissioning=" + decommissioning
            + ", excluded=" + excluded
            + ", notIncluded=" + notIncluded);
      }
      return true;
    }
    return false;
  }

5.2 计算集群平均使用率

指标 说明
totalUsedSpaces 各datanode已使用空间(dfsUsed,不包含non dfsUsed)相加的总和;
totalCapacities 各datanode总空间(DataNode配置的服务器磁盘目录)相加的总和;

集群平均使用率 各datanode已使用空间相加的总和 *100 / 各datanode总空间相加总和
average totalUsedSpaces * 100 / totalCapacities
line 54 @org.apache.hadoop.hdfs.server.balancer.BalancingPolicy.java

void initAvgUtilization() {
    for(StorageType t : StorageType.asList()) {
      final long capacity = totalCapacities.get(t);
      if (capacity > 0L) {
        final double avg  = totalUsedSpaces.get(t)*100.0/capacity;
        avgUtilizations.set(t, avg);
      }
    }
  }

5.3 计算单个DataNode使用率

指标 说明
dfsUsed 当前datanode dfs(dfsUsed,不包含non dfsUsed)已使用空间;
capacity 当前datanode(DataNode配置的服务器磁盘目录)总空间;
单个datanode使用率 = 当前 datanode dfs 已使用空间 *100.0 / 当前 datanode 总空间
utilization = dfsUsed * 100.0 / capacity
line 71 @org.apache.hadoop.hdfs.server.balancer.BalancingPolicy.java

Double getUtilization(DatanodeStorageReport r, final StorageType t) {
      long capacity = 0L;
      long dfsUsed = 0L;
      for(StorageReport s : r.getStorageReports()) {
        if (s.getStorage().getStorageType() == t) {
          capacity += s.getCapacity();
          dfsUsed += s.getDfsUsed();
        }
      }
      return capacity == 0L? null: dfsUsed*100.0/capacity;
    }
指标 说明
单个datanode使用率与集群平均使用率差值 单个datanode使用率集群平均使用率
utilizationDiff utilizationaverage

单个datanode utilizationDiff与阈值的差值 | 单个datanode使用率与集群平均使用率差值| – 阈值
thresholdDiff | utilizationDiff| – threshold

需要迁移或者可以迁入的空间 | 单个datanode使用率与集群平均使用率差值| *当前datanode总空间
maxSizeToMove | utilizationDiff| * capacity

可以迁入的空间计算:Math.min(remaining, maxSizeToMove)
需要迁移的空间计算:Math.min(max, maxSizeToMove)
remaining:datanode节点剩余空间
max:默认单个datanode单次balance迭代可以迁移的最大空间限制,缺省10G)
默认迭代次数为5,即运行一次balance脚本,单个datanode可以最大迁移的空间为:5*10G = 50G

差值判断后datanode的保存队列:

指标 比较 比较说明 类型
overUtilized utilizationDiff > 0 && thresholdDiff > 0 使用率超过平均值,且差值大于阈值 需要迁出数据的节点(source类型)
underUtilized utilizationDiff < 0 && thresholdDiff > 0 使用率低于平均值,且差值大于阈值 需要迁入数据的节点(Target类型)
aboveAvgUtilized utilizationDiff > 0 && thresholdDiff <= 0 使用率超过平均值,且差值小于等于阈值 需要迁出数据的节点(source类型)
belowAvgUtilized utilizationDiff < 0 && thresholdDiff <= 0 使用率低于平均值,且差值小于等于阈值 需要迁入数据的节点(Target类型)

5.4 数据迁移配对原则与步骤

原则

  1. 优先为同机架,其次为其它机架;
  2. 一对多配对
步骤 操作
第一步[Source -> Target] each overUtilized datanode => one or more underUtilized datanodes
第二步[Source -> Target] match each remaining overutilized datanode => one or more belowAvgUtilized datanodes
第三步[Target -> Source] each remaining underutilized datanode (step 1未和overUtilized匹配过) => one or more aboveAvgUtilized datanodes

line 467 @org.apache.hadoop.hdfs.server.balancer.Balancer.java

/** Decide all <source, target> pairs according to the matcher. */
  private void chooseStorageGroups(final Matcher matcher) {
    /* first step: match each overUtilized datanode (source) to
     * one or more underUtilized datanodes (targets).
     */
    LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => underUtilized");
    chooseStorageGroups(overUtilized, underUtilized, matcher);

    /* match each remaining overutilized datanode (source) to 
     * below average utilized datanodes (targets).
     * Note only overutilized datanodes that haven't had that max bytes to move
     * satisfied in step 1 are selected
     */
    LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => belowAvgUtilized");
    chooseStorageGroups(overUtilized, belowAvgUtilized, matcher);

    /* match each remaining underutilized datanode (target) to 
     * above average utilized datanodes (source).
     * Note only underutilized datanodes that have not had that max bytes to
     * move satisfied in step 1 are selected.
     */
    LOG.info("chooseStorageGroups for " + matcher + ": underUtilized => aboveAvgUtilized");
    chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher);
  }

构建每一对<source, target>时,需要计算当前可以迁移或者迁入的空间大小。dispatcher创建dispatchExecutor线程池执行数据迁移调度。


line 531 @org.apache.hadoop.hdfs.server.balancer.Balancer.java

private void matchSourceWithTargetToMove(Source source, StorageGroup target) {
    long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
    final Task task = new Task(target, size);
    source.addTask(task);
    target.incScheduledSize(task.getSize());
    dispatcher.add(source, target);
    LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
        + source.getDisplayName() + " to " + target.getDisplayName());
  }

6. 执行balance

查看执行前节点数据量

###6.1 预计算迁移数据量

节点名称 node2 node3
容量 1597G 1597G
总容量 3194G 3194G
使用量 292G 18G
集群平均使用率
average
9.7057 9.7057
单个datanode使用率
utilization
18.2843 1.1271
单个datanode使用率与集群平均使用率差值
utilizationDiff
8.5786 -8.5786
单个datanode utilizationDiff与阈值的差值(thresholdDiff 3.5786 3.5786
需要迁移或者可以迁入的空间
maxSizeToMove
57.1502 57.1502
utilizationDiff>0
&
thresholdDiff >0
utilizationDiff<0
&
thresholdDiff >0
迁出数据 迁入数据

指标 计算
集群平均使用率 各datanode已使用空间相加的总和 *100 / 各datanode总空间相加总和
average totalUsedSpaces * 100 / totalCapacities
9.7057% (292+18)*100/3194

单个datanode使用率 当前 datanode dfs 已使用空间 *100.0 / 当前 datanode 总空间
utilization dfsUsed * 100.0 / capacity
node2 18.2843 % (292*100)/1597
node3 1.1271% (18*100)/1597

单个datanode使用率与集群平均使用率差值 单个datanode使用率集群平均使用率
utilizationDiff utilizationaverage
node2 8.5786% 18.2843% – 9.7057%
node3 -8.5786% 1.1271% – 9.7057%

单个datanode| utilizationDiff与阈值的差值 |单个datanode使用率与集群平均使用率差值| – 阈值
thresholdDiff | utilizationDiff| – threshold
node2 3.5786% 8.5786% – 5%
node3 3.5786%

需要迁移或者可以迁入的空间 |单个datanode使用率与阈值差值| *当前 datanode 容量
maxSizeToMove |thresholdDiff | * 1597
node2 57.1502 3.5786% * 1597
node3 57.1502 3.5786% * 1597

6.2 执行balance

balance 命令

hdfs balancer -fs hdfs://node1:8020 -threshold 5

打印日志

2019-7-10 14:56:18 0 4.68 GB 57.20 GB 10 GB
19/07/10 14:56:27 INFO balancer.Balancer: dfs.balancer.movedWinWidth = 5400000 (default=5400000)
19/07/10 14:56:27 INFO balancer.Balancer: dfs.balancer.moverThreads = 1000 (default=1000)
19/07/10 14:56:27 INFO balancer.Balancer: dfs.balancer.dispatcherThreads = 200 (default=200)
19/07/10 14:56:27 INFO balancer.Balancer: dfs.datanode.balance.max.concurrent.moves = 50 (default=50)
19/07/10 14:56:27 INFO balancer.Balancer: dfs.balancer.getBlocks.size = 2147483648 (default=2147483648)
19/07/10 14:56:27 INFO balancer.Balancer: dfs.balancer.getBlocks.min-block-size = 10485760 (default=10485760)
19/07/10 14:56:27 INFO balancer.Balancer: dfs.balancer.max-size-to-move = 10737418240 (default=10737418240)
19/07/10 14:56:27 INFO balancer.Balancer: dfs.blocksize = 134217728 (default=134217728)

执行之后数据分布情况

结束balance之后计算迁移了多少数据量 77.21-18.49=58.72G 与我们计算的结果大约相差1G左右

7. 结语

  1. 对于一些大型的HDFS集群(随时可能扩容或下架服务器),balance脚本需要作为后台常驻进程;

  2. 根据官方建议,脚本需要部署在相对空闲的服务器上;

  3. 停止脚本通过kill进程实现(建议不kill,后台运行完会自动停止,多次执行同时也只会有一个线程存在,其它自动失败);

针对datanode存储维护,可以针对以下几个方向进行优化:

  1. 通过参数(idleiterations)增加迭代次数,以增加datanode允许迁移的数据;
  2. 通过参数(exclude, include)设计合理的允许进行balance策略的服务器,比如将使用率最低(20%)和最高(20%)的进行balance策略;
  3. 通过参数(threshold )设计合理的阈值;

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
关于maven 编译时source 1.5 中不支持 lambda 表达式 的问题记录 关于maven 编译时source 1.5 中不支持 lambda 表达式 的问题记录
1. 错误说明首先贴上错误截图 原因是Maven Compiler 插件默认会加 -source 1.5 及 -target 1.5 参数来编译 当我们使用1.8 中的lambda 表达式时需要将source 版本调高 2. 解决办法在p
2019-09-11
下一篇 
HDFS 负载均衡策略 HDFS 负载均衡策略
1. Hadoop HDFS介绍Hadoop 分布式文件系统(Hadoop Distributed File System),简称 HDFS,被设计成适合运行在通用硬件上的分布式文件系统。它和现有的分布式文件系统有很多的共同点。HDFS 是
2019-08-29
  目录