1. 介绍
在Hadoop应用过程中,处理小文件问题是一项常见的挑战。由于HDFS主要针对大型数据集(M字节以上)设计,大量小文件的出现可能导致Namenode内存使用效率下降、RPC调用速度减慢、block扫描处理速度降低,从而影响整个应用层的性能。本文旨在阐明小文件存储问题的定义,并探讨解决小文件问题的方法和策略。
2. 什么是小文件
小文件是指比 HDFS 默认的 block 大小(默认配置为 128MB)明显小的文件。需要注意的是,在 HDFS 上有一些小文件是不可避免的。这些文件如库 jars、XML 配置文件、临时暂存文件等。但当小文件变的大量,以至于集群中小文件成为主流,此时就需要对小文件进行治理,治理的目标是让文件大小尽可能接近 HDFS block 大小,或HDFS block 大小的倍数。
Hadoop 的存储层和应用层的设计并不是为了在大量小文件的情况下高效运行。在说到这个问题的意义之前,我们先来回顾一下 HDFS 是如何存储文件的。
在 HDFS 中数据和元数据是独立的实体。文件被分割成 block,这些 block 被存储在 DataNode 的本地文件系统中,并在整个集群中复制。HDFS 命名空间树和相关的元数据作为对象保存在 NameNode 的内存中(会持久化到磁盘上),每个对象一般占用大约 150 个字节。
下面的两个方案说明了小文件的问题。
- 方案 1(1 个 192M 的大文件)
其中 inode(索引节点)是文件系统中的一个重要概念,用于存储关于文件或目录的元数据。在HDFS(Hadoop分布式文件系统)中,每个文件都有一个对应的inode。一个文件的inode包含了关于该文件的一些重要信息,如文件大小、权限、创建时间、修改时间、所有者等。
当我们在文件系统中创建、修改或访问一个文件时,操作系统会使用inode来查找和管理这个文件。在HDFS中,1个文件的inode表示的是该文件所对应的inode,用于存储和管理文件的元数据。
- 方案 2(192 个小文件,每个 1M 的小文件)。
从上面的图示中可以看到, 同样是 192M 的文件,一个大文件与一堆小文件相比,在 Namenode 堆上需要的内存相差 100 倍以上。
3. 对存储层的影响
当 NameNode 重启时,它必须将文件系统元数据从本地磁盘加载到内存中。这意味着,如果 NameNode 的元数据很大,重启速度会非常慢。NameNode 还必须跟踪集群上的 block 位置的变化,太多的小文件也会导致 NameNode 在 DataNode 耗尽磁盘上的数据空间之前,就先耗尽内存中的元数据空间。DataNode 还会通过网络向 NameNode 报告块的变化;更多的 block 意味着要通过网络发送更多的元数据变更。
更多的文件意味着更多的读取请求需要请求 NameNode,这可能最终会堵塞 NameNode 的容量,增加 RPC 队列和处理延迟,进而导致性能和响应能力下降。按照经验,RPC 工作负载接近 40K~50K 的 RPCs/s 是比较高的。
4. 对应用层的影响
一般来说,在通过 Impala 这样的 Ad HOC SQL 引擎或 MapReduce 或 Spark 这样的应用框架运行计算时,拥有大量的小文件会产生更多的磁盘请求。
4.1 MapReduce / Spark
在 Hadoop 中,block 是可以进行计算的最细粒度的数据单位。因此,它影响着一个应用的吞吐量。在 MapReduce 中,每读取一个 block 都需要 1 个 Map Container。因此,小文件会降低性能,增加应用开销,因为每个任务都需要自己的 JVM 进程。
对于 Spark 来说,小文件也是类似的,在 Spark 中,每个“map”相当于 Spark 任务在执行器中每次读取和处理一个分区。每个分区默认情况下是一个 block。这意味着,如果你有很多小文件,每个文件都在不同的分区中读取,这将导致大量的任务开销。
另外,MapReduce 作业也会创建空间文件,如_SUCCESS 和 _FAILURE,用于标记 MapReduce 任务的 finish 状态。这些文件仍然会在 Namenode 中注册为一个 inode item,如前文所述,每个 使用 150 个字节。清除这些文件的一个简单有效的方法是使用下面的 HDFS 命令:
hdfs dfs -ls -R | awk '$1 !~ /^d/ && $5 == "0" { print $8 }' | xargs -n100 hdfs dfs –rm
命令说明:
hdfs dfs -ls -R
:这个命令用于递归地列出HDFS中的所有文件和目录,包括子目录下的文件。-ls
表示列出文件和目录的信息,-R
表示递归地进行操作。
|
:这是管道操作符,用于将上一个命令的输出作为下一个命令的输入。
awk '$1 !~ /^d/ && $5 == "0" { print $8 }'
:这个命令使用Awk工具对hdfs dfs -ls -R
的输出进行处理。它通过匹配条件来筛选出不是目录(即文件)且文件大小为0的行,并提取第8个字段(即文件路径)进行输出。
|
:再次使用管道操作符将上一个命令的输出作为下一个命令的输入。
xargs -n100 hdfs dfs –rm
:这个命令使用xargs工具将前面命令的输出作为参数,并对每100个文件执行hdfs dfs -rm
命令,即删除这些文件。
xargs -n100
:指定每次传递给下一个命令的参数数量为100。
hdfs dfs -rm
:删除HDFS中指定的文件。
这个命令的功能是在HDFS中递归地列出所有文件,并找到文件大小为0的文件。然后,通过批量处理的方式,每次删除100个大小为0的文件。这个命令对于清理HDFS中的空文件非常有用。
注意:
这个命令将把这些文件移动到.Trash 位置(HDFS 回收站开启的情况下),一旦 Trash 清理策略生效,这些文件也将随之删除。
如果有应用程序对这些文件有依赖性,删除这些文件可能会导致应用程序失败。
5. 小文件是如何产生的
5.1 流式数据处理
spark streaming/flink 等流式计算框架或者 bacth 的数据计算,最终可能会一段时间内产生大量的小文件。对于流式数据的近乎实时的要求,小的 timewindow(每隔几分钟或几个小时)数量较小,就会生产很多小文件。
5.2 拥有大量 map/reduce 的任务
MapReduce 任务,如果有大量的 map 和 reduce task,在 HDFS 上生成的文件基本上与 map 数量(对于 Map-Only 作业)或 reduce 数量(对于 MapReduce 作业)成正比。大量的 reducer 没有足够的数据被写到 HDFS 上,会把结果集稀释成很小的文件,因为每个 reducer 只写一个文件。按照同样的思路,数据倾斜也会产生类似的效果,即大部分数据被路由到一个或几个 reduce,让其他的 reduce 写的数据很少,导致文件变小。
5.3 过度分区表
过度分区表是指每个分区的数据量很小(<256 MB)的 Hive 表。Hive Metastore Server (HMS) API 调用开销会随着表拥有的分区数量而增加。这反过来会导致性能下降。在这种情况下,应该考虑表的分区设计并减少分区粒度。
5.4 Spark 过度并行化
在 Spark 作业中,根据写任务中提到的分区数量,每个分区会写一个新文件。这类似于 MapReduce 框架中的每个 reduce 任务都会创建一个新文件。Spark 分区越多,写入的文件就越多。控制分区的数量来减少小文件的生成。
5.5 文件格式和压缩
出于小文件治理的目的,我们更推荐使用非 TexFile 的序列化存储方法。
6. 如何识别出小文件
6.1 fsImage
因为 NameNode 存储了所有与文件相关的元数据,所以它将整个命名空间保存在内存中,而 fsimage 是 NameNode 的本地本机文件系统中的持久化记录。因此,我们可以通过分析 fsimage 来找出文件的元信息。
- fsimage 中可用的字段有:
字段名 | 描述 |
---|---|
Path | 文件或目录在文件系统中的位置 |
Replication | 文件在HDFS中的副本数量 |
ModificationTime | 文件或目录的最后修改时间 |
AccessTime | 文件或目录的最后访问时间 |
PreferredBlockSize | 文件在HDFS中的数据块的大小 |
BlocksCount | 文件所包含的数据块数量 |
FileSize | 文件的大小 |
NSQUOTA | 目录中文件和目录的最大数量限制 |
DSQUOTA | 目录中文件和目录的总大小限制 |
Permission | 文件或目录的访问权限 |
UserName | 文件或目录的所有者用户名 |
GroupName | 文件或目录所属的用户组名 |
通常可以采用以下方法来解析 fsimage
拷贝 Namenode 数据目录下的 fsimage 文件到其他目录,然后执行:
hdfs oiv -p Delimited -delimiter "|" -t /tmp/tmpdir/ -i fsimage_copy -o fsimage.out
这个命令是HDFS中的 “oiv” 命令,用于从 fsimage 文件中提取元数据信息并以指定的格式进行输出。下面是对该命令的解释:
hdfs oiv
: 这是用于运行HDFS Offline Image Viewer(离线镜像查看器)的HDFS命令。-p Delimited
: 这是 -oiv 命令的一个选项,指定输出格式为分隔符分隔的格式,以便将元数据字段以自定义的分隔符进行分隔。-delimiter "|"
: 这是指定分隔符的参数,该示例中使用竖线(|)作为分隔符。-t /tmp/tmpdir/
: 这是指定临时目录的参数,用于存储临时文件。-i fsimage_copy
: 这是指定输入的fsimage文件的参数,即要处理的fsimage文件的路径和名称。-o fsimage.out
: 这是指定输出文件的参数,即处理后的元数据将写入的输出文件的路径和名称。
综合起来,这个命令的作用是从指定的fsimage文件中提取元数据信息,并将元数据以分隔符分隔的格式输出到指定的输出文件中(使用竖线作为分隔符)。输出文件的路径和名称是”fsimage.out”,临时文件将存储在”/tmp/tmpdir/“目录中。
请注意,此命令需要在Hadoop集群的环境中运行,并且需要适当的权限才能访问和处理HDFS的元数据。
更多关于 hdfs oiv 命令的使用,可以查看 useage。
6.2 fsck
使用 fsck 命令扫描当前的 HDFS 目录并保存扫描后的信息可以查看当前 HDFS 中的小文件
hdfs fsck / -files -blocks -locations > fsck_output.txt
/
是要扫描的HDFS目录的路径,这里使用根目录 /
作为示例,您可以根据实际情况替换为其他目录路径。-files
参数用于打印文件的详细信息。-blocks
参数用于打印数据块的详细信息。-locations
参数用于打印数据块的位置信息。fsck_output.txt
是保存扫描结果的输出文件名,您可以根据需要自定义输出文件的路径和名称。
执行完命令后,fsck
命令将扫描指定的HDFS目录,并将扫描结果保存到指定的输出文件中。
可以使用文本编辑器或命令行查看保存的输出文件来检查扫描后的信息。
请注意,执行fsck
命令需要适当的权限来访问和扫描HDFS目录。此外,fsck
命令可能会执行较长时间,具体取决于HDFS集群的规模和文件系统的大小。在大型集群中,考虑生产环境的稳定性,不建议使用 fsck 命令,因为它会带来额外的开销。
7. 如何处理小文件
7.1 流式写入
调整流式写入的时间窗口是一个不错的选择,如果业务对实时性要求很高,那么可以根据数据类型(非结构化 vs 结构化)、append/update 频率和数据使用模式(随机读取 vs 聚合),HBase 和 Kudu 是存储层的更好选择。对于已经存在的小文件,也可以设置定期的 Job 对这些文件进行压缩、合并,以减少文件量和文件数量。
7.2 过度分区表
在决定分区的粒度时,要考虑到每个分区的数据量。为有大文件的分区做计划(用 Parquet 的话,约 256MB 或更大),即使这意味着有较少的粒度分区,例如每月而不是每天的分区。对于数据量小的表(几百 MB),可以考虑创建一个非分区表。
7.3 Spark 过度并行化
在 Spark 中向 HDFS 写入数据时,在向磁盘写入数据前要重新分区或聚合分区。这些语句中定义的分区数量将决定输出文件的数量。强烈建议检查 Spark 作业的输出,并验证创建的文件数量和实现的吞吐量。
7.4 使用merge 命令压缩
hadoop 本身提供 merge 命令,当然用户也可以自行编写工具实现。
Hadoop提供的merge
命令用于合并多个小文件为一个或多个更大的文件。这对于减少小文件数量和提高文件系统性能非常有用。下面是对merge
命令的介绍:
hadoop fs -getmerge <src> <localdst> [addnl]
参数说明:
<src>
:指定要合并的源目录或文件的路径。它可以是HDFS中的目录或文件。<localdst>
:指定合并后的文件的本地目标路径,即保存合并后文件的本地文件系统路径。[addnl]
:可选参数,当指定该参数时,将追加源文件名作为每个合并后的文件的一部分,以在合并后的文件中保留原始文件的边界。
注意事项:
<src>
参数可以指定一个目录,此时会递归地合并目录下的所有文件。<src>
参数也可以指定多个文件,以空格分隔,这样可以将这些文件合并成一个更大的文件。- 如果
<localdst>
指定的本地文件已存在,将会覆盖该文件。
使用示例:
- 合并HDFS中的多个文件到本地文件:
hadoop fs -getmerge /user/hadoop/input /home/user/mergedfile
上述命令将HDFS中/user/hadoop/input
目录下的所有文件合并为一个文件,并保存到本地文件系统的/home/user/mergedfile
路径下。
- 合并HDFS目录下的多个文件到本地文件,并保留源文件名:
hadoop fs -getmerge /user/hadoop/input /home/user/mergedfile true
该命令将目录/user/hadoop/input
下的所有文件合并为一个文件,并在合并后的文件中保留源文件的边界。
使用merge
命令可以方便地将多个小文件合并成一个更大的文件,有助于提高Hadoop文件系统的性能和效率。
7.5 使用 Hive 对数据进行压缩
如果你有一个现有的 Hive 表有大量的小文件,那么可以通过以下设置来重写这个表(parquet 格式)。关于 Hive 压缩可以查阅其他文档获取更详细的信息。
# 启用对输出结果进行压缩。当查询产生输出结果时,将使用压缩算法对结果进行压缩,以减少存储空间。
set hive.exec.compress.output=true;
# 设置Parquet文件的压缩算法为Snappy。Parquet是一种列式存储格式,该参数指定使用Snappy算法对Parquet文件进行压缩。
set parquet.compression=snappy;
# 启用Map端文件合并。当执行Hive查询时,Map任务在输出文件到HDFS时,会尝试将多个小文件合并成更大的文件,以减少存储和IO开销。
set hive.merge.mapfiles=true;
# 启用MapReduce端文件合并。类似于上一个参数,但这个参数控制在MapReduce任务的输出阶段进行文件合并。
set hive.merge.mapredfiles=true;
# 设置合并小文件的平均大小。当启用文件合并时,Hive将尝试合并小于该大小的文件。在这个例子中,平均大小为128MB。
set hive.merge.smallfiles.avgsize=134217728;
# 设置每个任务执行合并操作的最大数据大小。当合并操作发生时,Hive将尝试合并小于该大小的文件。
set hive.merge.size.per.task = 268435456;
# 启用动态分区排序优化。当对分区表进行查询时,启用此参数可以提高性能,通过对查询结果进行排序来优化分区读取。
set hive.optiming.sort.dynamic.partition = true;
# 设置Parquet文件的块大小。Parquet文件由多个块组成,该参数指定每个块的大小。在这个例子中,块大小为256MB。
set parquet.blocksize= 268435456;
# 设置HDFS文件系统的块大小。HDFS将数据划分为多个块进行存储,该参数指定每个块的大小。在这个例子中,块大小为256MB。
set dfs.block.size=268435456;
关于 Hive 配置属性的详细信息可以在 Apache Hive 官方页面上找到,这里再提供一些重新数据的方法。
CREATE TABLE AS SELECT(CTAS)对于一个非分区表会比较方便。
你也可以先运行 CREATE TABLE LIKE (CTL)来复制一个表结构,然后使用 INSERT OVERWRITE SELECT 语句将数据从源表加载数据到目标表。
注意:如果在没有定义静态分区名的情况下插入数据,需要在 Hive 中启用非严格的动态分区模式,可以通过设置
set hive.exec.dynamic.partition.mode=non-strict
# 用于设置动态分区模式的行为。动态分区是指在将数据加载到分区表时,根据数据中的某个列的值自动创建分区。
分区列必须是选择语句中的最后一列,这样动态分区才能在这种情况下工作。此外,也可以直接使用 mapred.reduce.tasks 设置来配置 reduce 的数量。创建的文件数量将等于使用的减速器数量。设置一个最佳的减速器值取决于写入的数据量。
8. 总结
提前规避要好于事后补救,在任务开发以及表设计的前期尽可能考虑小文件问题尤为重要,因为事后修改任务或者修改表带来业务失败的概率会大得多。此外,小文件治理也是一个长期的过程,对于一个生产集群,定期的进行小文件治理是必要的。
数据资产中心将小文件数量或比例转化成指数关联到库,表,目录上。用户可以根据库,表,目录等信息发现小文件产生的任务,对小文件的产生进行追本溯源,然后通过调整任务参数等手段从源头进行治理。
追本溯源可以从源头切断小文件的产生,但是这个是比较理想化的情况,现实中存在很多场景无法在源头进行调整。网易数据资产中心也提供了定期触发的小文件合并策略,在策略识别到小文件过多的表或者目录上进行小文件合并。对于已经产生了很多小文件的表或目录提供主动合并的手段将小文件进行合并。