HDFS 中常见的数据格式 2.面向列的文件格式介绍


1. 列式存储

列式存储, 顾名思义就是按照列进行存储, 把某一列的数据连续的存储, 每一行中的不同列的值离散分布, 列式存储技术并不新鲜, 在关系数据库中都已经在使用, 尤其是在针对OLAP场景下的数据存储, 由于OLAP场景下的数据大部分情况下都是批量导入, 基本上不需要支持单条记录的增删改操作, 而查询的时候大多数都是只使用部分列进行过滤, 聚合, 对少数列进行计算( 基本不需要 select * from tname 之类的查询). 列式存储可以大大提升这类查询的性能, 交之于行式存储能够带来这些优化:

  • 由于每一列的数据类型相同, 所以可以针对不同类型的列使用不同的编码和压缩方式, 这样可以大大降低数据存储空间.

  • 读取数据的时候可以把映射(project) 下推, 只需要读取需要的列, 这样可以大大减少每次查询的I/O 数据量, 更深圳可以支持谓词下推, 跳过不满足条件的列.

  • 由于每一列的数据类型相同, 可以使用更加适合CPU pipeline的编码方式, 减小CPU的缓存失效

2. Parquet

2.1 Parquet是什么

Parquet 的灵感来自于2010年Google发表的 Dremel论文 , 文中介绍了一种支持嵌套结构的存储格式, 并且使用了列式存储的方式提升查询性能, 在Dremel 论文中还介绍了Google如何使用这种存储格式实现并行查询, 如果对此感兴趣可以参考 论文 和开源实现 Apache Drill

2.2 嵌套数据模型

在接触大数据之前, 我们简单的将数据划分为结构化数据和非结构化数据. 通常我们使用关系数据库存储结构化数据, 而关系数据库中使用数据模型都是扁平式的, 遇到诸如List, Map 和自定义的Struct的时候就需要用户在应用层解析, 但是在大数据环境下, 通常数据的来源是服务端的埋点数据, 很可能需要把程序中的某些对象内容作为输出的一部分, 而每一个对象都可能是嵌套的, 所以如果能够原生的支持这种数据, 这样在查询的时候就不需要额外的解析便能够获得想要的结果. 例如在Twitter, 在他们的生产环境中的一个典型的日志对象(一条记录) 有87个字段, 其中嵌套了7层,如下图:

另外, 随着嵌套格式的数据的需求日益增加, 目前hadoop生态圈中主流的查询引擎都支持更丰富的数据类型, 例如 Hive, SparkSQL,Impala等都原生的支持struct, Map,Array这样复杂的数据类型, 这样也就使得parquet 这种原生支持嵌套数据类型的存储格式变得至关重要, 性能也会更好。

2.3 Parquet的组成

Parquet仅仅是一种存储格式, 它是与语言,平台无关的, 并且不需要和任何一种数据处理框架绑定, 目前能够和Parquet是被的组件包括下面这些, 可以看出基本上通常使用的查询引擎和计算框架都已适配, 并且可以很方便的将其它序列化工具生成的数据转换为Parquet格式

  • 查询引擎: Hive, Impala, Pig, Presto, Drill, Tajo, HAWQ, IBM Big SQL
  • 计算框架: MapReduce, Spark, Cascading, Crunch, Scalding, Kite
  • 数据模型: Avro, Thrift, Protocol Buffers, POJOs

2.4 项目组成

Parquet项目由以下几个子项目组成:

  • parquet-format 项目由java实现,它定义了所有Parquet元数据对象,Parquet的元数据是使用Apache Thrift进行序列化并存储在Parquet文件的尾部。
  • parquet-mr 项目由java实现,它包括多个模块,包括实现了读写Parquet文件的功能,并且提供一些和其它组件适配的工具,例如Hadoop Input/Output Formats、Hive Serde(目前Hive已经自带Parquet了)、Pig loaders等。
  • parquet-compatibility 项目,包含不同编程语言之间(JAVA和C/C++)读写文件的测试代码。
  • parquet-cpp 项目,它是用于用于读写Parquet文件的C++库。

下图展示了Parquet各个组件的层次以及从上到下交互的方式。

  • 数据存储层定义了Parquet的文件格式, 其中元数据在parquet-format中定义, 包括Parquet原始类型定义, Page类型, 编码类型, 压缩类型等等
  • 对象转换层完成其他对象模型与Parquet内部数据模型的映射和转换, Parquet的编码方式使用的是striping and assembly算法
  • 对象模型定义了如何读取Parquet文件的内容, 这一层转换包括 Avro, Thrift, PB 等序列化格式, Hive serde等的适配, 并且为了帮助大家理解和使用, Parquet提供了 org.apache.parqurt.example 包 实现了java对象的Parquet文件的转换

2.5 数据模型

Parquet支持嵌套的数据模型, 类似于Protocol Buffers, 每一个数据模型的schema 包含多个字段, 每个字段又可以包含多个字段, 每一个字段有三个属性: 重复数, 数据类型和字段名, 重复数可以是一下三种: required (出现1次), repeated(出现0次或多次), optional (出现0次或1次). 每个字段的数据类型分为两种: group(复杂类型) 和primitive(基本类型). 例如 Dremel 中提供的Document 的schema 示例, 它的定义如下

message Document {
   required int64 DocId;// 只出现一次

  optional group Links { //出现0次或1次

     repeated int64 Backward; //出现0次或多次
     repeated int64 Forward;//出现0次或多次
   }
   repeated group Name {
      repeated group Language {
        required string Code;
        optional string Country;
      }
      optional string Url;
   }
 }

可以把这个Schema转换成树状结构,根节点可以理解为repeated类型,如下图:

  • 可以看出在Schema中所有的基本类型字段都是叶子节点,在这个Schema中一共存在6个叶子节点,如果把这样的Schema转换成扁平式的关系模型,就可以理解为该表包含六个列。
  • Parquet中没有Map、Array这样的复杂数据结构,但是可以通过repeated和group组合来实现这样的需求。
  • 在这个包含6个字段的表中有以下几个字段和每一条记录中它们可能出现的次数:
DocId int64 只能出现一次
Links.Backward int64 可能出现任意多次,但是如果出现0次则需要使用NULL标识
Links.Forward int64 同上
Name.Language.Code string 同上
Name.Language.Country string 同上
Name.Url string 同上

由于在一个表中可能存在出现任意多次的列,对于这些列需要标示出现多次或者等于NULL的情况,它是由Striping/Assembly算法实现的。

2.6 Striping/Assembly算法

上文介绍了Parquet的数据模型,在Document中存在多个非required列,由于Parquet一条记录的数据分散的存储在不同的列中,如何组合不同的列值组成一条记录是由Striping/Assembly算法决定的,在该算法中列的每一个值都包含三部分:value、repetition level和definition level。

2.7 Repetition Levels

为了支持repeated类型的节点, 在写入的时候该值等于它前面的值在哪一层节点是不共享的, 在读取 的时候根据该值可以推导出那一层上需要创建新的节点, 例如对于这样的一个Schema和两条记录.

message nested {
   repeated group leve1 {
      repeated string leve2;
   }
 }
 r1:[[a,b,c,] , [d,e,f,g]]
 r2:[[h] , [i,j]]

计算repetition level值的过程如下:

  1. value = a 是一条记录的开始, 和面前的值(已经没有值了) 在根节点(第0层) 上是不共享的, 所以 repeated level = 0

  2. value = b 它前面的值已经共享了level1 这个节点, 但是level2这个节点上是不共享的, 所以 repeated level = 2

  3. 同理 value =c , repeated level = 2

  4. value = d 和前面的值共享了根节点(属于相同记录), 但是这个level 1 这个节点上是不共享的, 所以repeated level = 1

  5. value = h 和前面的值不同属于一条记录, 也就是不共享任何节点, 所以repeated level = 0

根据上面的分析, 每个value需要记录的repeated level值 如下:

在读取的时候,顺序的读取每一个值,然后根据它的repeated level创建对象,当读取value=a时repeated level=0,表示需要创建一个新的根节点(新记录),value=b时repeated level=2,表示需要创建一个新的level2节点,value=d时repeated level=1,表示需要创建一个新的level1节点,当所有列读取完成之后可以创建一条新的记录。本例中当读取文件构建每条记录的结果如下:

可以看出repeated level=0表示一条记录的开始,并且repeated level的值只是针对路径上的repeated类型的节点,因此在计算该值的时候可以忽略非repeated类型的节点,在写入的时候将其理解为该节点和路径上的哪一个repeated节点是不共享的,读取的时候将其理解为需要在哪一层创建一个新的repeated节点,这样的话每一列最大的repeated level值就等于路径上的repeated节点的个数(不包括根节点)。减小repeated level的好处能够使得在存储使用更加紧凑的编码方式,节省存储空间。

2.8 Definition Levels

有了repeated level我们就可以构造出一个记录了,为什么还需要definition levels呢?由于repeated和optional类型的存在,可能一条记录中某一列是没有值的,假设我们不记录这样的值就会导致本该属于下一条记录的值被当做当前记录的一部分,从而造成数据的错误,因此对于这种情况需要一个占位符标示这种情况。

definition level的值仅仅对于空值是有效的,表示在该值的路径上第几层开始是未定义的,对于非空的值它是没有意义的,因为非空值在叶子节点是定义的,所有的父节点也肯定是定义的,因此它总是等于该列最大的definition levels。例如下面的schema。

message ExampleDefinitionLevel {
  optional group a {
   optional group b {
    optional string c;
   }
  }
 }

它包含一个列a.b.c,这个列的的每一个节点都是optional类型的,当c被定义时a和b肯定都是已定义的,当c未定义时我们就需要标示出在从哪一层开始时未定义的,如下面的值:

由于definition level只需要考虑未定义的值,而对于repeated类型的节点,只要父节点是已定义的,该节点就必须定义(例如Document中的DocId,每一条记录都该列都必须有值,同样对于Language节点,只要它定义了Code必须有值),所以计算definition level的值时可以忽略路径上的required节点,这样可以减小definition level的最大值,优化存储。

2.9 Parquet文件格式

Parquet文件是以二进制方式存储的,所以是不可以直接读取的,文件中包括该文件的数据和元数据,因此Parquet格式文件是自解析的。在HDFS文件系统和Parquet文件中存在如下几个概念。

  • HDFS块(Block):它是HDFS上的最小的副本单位,HDFS会把一个Block存储在本地的一个文件并且维护分散在不同的机器上的多个副本,通常情况下一个Block的大小为256M、512M等。
  • HDFS文件(File):一个HDFS的文件,包括数据和元数据,数据分散存储在多个Block中。
  • 行组(Row Group):按照行将数据物理上划分为多个单元,每一个行组包含一定的行数,在一个HDFS文件中至少存储一个行组,Parquet读写的时候会将整个行组缓存在内存中,所以如果每一个行组的大小是由内存大的小决定的,例如记录占用空间比较小的Schema可以在每一个行组中存储更多的行。
  • 列块(Column Chunk):在一个行组中每一列保存在一个列块中,行组中的所有列连续的存储在这个行组文件中。一个列块中的值都是相同类型的,不同的列块可能使用不同的算法进行压缩。
  • 页(Page):每一个列块划分为多个页,一个页是最小的编码的单位,在同一个列块的不同页可能使用不同的编码方式。

2.10 文件格式

通常情况下,在存储Parquet数据的时候会按照Block大小设置行组的大小,由于一般情况下每一个Mapper任务处理数据的最小单位是一个Block,这样可以把每一个行组由一个Mapper任务处理,增大任务执行并行度。Parquet文件的格式如下图所示。

上图展示了一个Parquet文件的内容,一个文件中可以存储多个行组,文件的首位都是该文件的Magic Code,用于校验它是否是一个Parquet文件,Footer length了文件元数据的大小,通过该值和文件长度可以计算出元数据的偏移量,文件的元数据中包括每一个行组的元数据信息和该文件存储数据的Schema信息。除了文件中每一个行组的元数据,每一页的开始都会存储该页的元数据,在Parquet中,有三种类型的页:数据页、字典页和索引页。数据页用于存储当前行组中该列的值,字典页存储该列值的编码字典,每一个列块中最多包含一个字典页,索引页用来存储当前行组下该列的索引,目前Parquet中还不支持索引页,但是在后面的版本中增加。

在执行MR任务的时候可能存在多个Mapper任务的输入是同一个Parquet文件的情况,每一个Mapper通过InputSplit标示处理的文件范围,如果多个InputSplit跨越了一个Row Group,Parquet能够保证一个Row Group只会被一个Mapper任务处理。

2.11 映射下推(Project PushDown)

说到列式存储的优势,映射下推是最突出的,它意味着在获取表中原始数据时只需要扫描查询中需要的列,由于每一列的所有值都是连续存储的,所以分区取出每一列的所有值就可以实现TableScan算子,而避免扫描整个表文件内容。

在Parquet中原生就支持映射下推,执行查询的时候可以通过Configuration传递需要读取的列的信息,这些列必须是Schema的子集,映射每次会扫描一个Row Group的数据,然后一次性得将该Row Group里所有需要的列的Cloumn Chunk都读取到内存中,每次读取一个Row Group的数据能够大大降低随机读的次数,除此之外,Parquet在读取的时候会考虑列是否连续,如果某些需要的列是存储位置是连续的,那么一次读操作就可以把多个列的数据读取到内存。

2.12 谓词下推(Predicate PushDown)

在数据库之类的查询系统中最常用的优化手段就是谓词下推了,通过将一些过滤条件尽可能的在最底层执行可以减少每一层交互的数据量,从而提升性能,例如”select count(1) from A Join B on A.id = B.id where A.a > 10 and B.b < 100”SQL查询中,在处理Join操作之前需要首先对A和B执行TableScan操作,然后再进行Join,再执行过滤,最后计算聚合函数返回,但是如果把过滤条件A.a > 10和B.b < 100分别移到A表的TableScan和B表的TableScan的时候执行,可以大大降低Join操作的输入数据。

无论是行式存储还是列式存储,都可以在将过滤条件在读取一条记录之后执行以判断该记录是否需要返回给调用者,在Parquet做了更进一步的优化,优化的方法时对每一个Row Group的每一个Column Chunk在存储的时候都计算对应的统计信息,包括该Column Chunk的最大值、最小值和空值个数。通过这些统计值和该列的过滤条件可以判断该Row Group是否需要扫描。另外Parquet未来还会增加诸如Bloom Filter和Index等优化数据,更加有效的完成谓词下推。

在使用Parquet的时候可以通过如下两种策略提升查询性能:1、类似于关系数据库的主键,对需要频繁过滤的列设置为有序的,这样在导入数据的时候会根据该列的顺序存储数据,这样可以最大化的利用最大值、最小值实现谓词下推。2、减小行组大小和页大小,这样增加跳过整个行组的可能性,但是此时需要权衡由于压缩和编码效率下降带来的I/O负载。

2.13 性能

相比传统的行式存储,Hadoop生态圈近年来也涌现出诸如RC、ORC、Parquet的列式存储格式,它们的性能优势主要体现在两个方面:

  1. 更高的压缩比,由于相同类型的数据更容易针对不同类型的列使用高效的编码和压缩方式。
  2. 更小的I/O操作,由于映射下推和谓词下推的使用,可以减少一大部分不必要的数据扫描,尤其是表结构比较庞大的时候更加明显,由此也能够带来更好的查询性能。

上图是展示了使用不同格式存储TPC-H和TPC-DS数据集中两个表数据的文件大小对比,可以看出Parquet较之于其他的二进制文件存储格式能够更有效的利用存储空间,而新版本的Parquet(2.0版本)使用了更加高效的页存储方式,进一步的提升存储空间。

上图展示了Twitter在Impala中使用不同格式文件执行TPC-DS基准测试的结果,测试结果可以看出Parquet较之于其他的行式存储格式有较明显的性能提升。

上图展示了criteo公司在Hive中使用ORC和Parquet两种列式存储格式执行TPC-DS基准测试的结果,测试结果可以看出在数据存储方面,两种存储格式在都是用snappy压缩的情况下量中存储格式占用的空间相差并不大,查询的结果显示Parquet格式稍好于ORC格式,两者在功能上也都有优缺点,Parquet原生支持嵌套式数据结构,而ORC对此支持的较差,这种复杂的Schema查询也相对较差;而Parquet不支持数据的修改和ACID,但是ORC对此提供支持,但是在OLAP环境下很少会对单条数据修改,更多的则是批量导入。

2. RCFile

在新建Hive表时,可以使用stored as rcfile来指定hive文件的存储方式为RCFile。

2.1 RCFile文件结构

下图是一个RCFile的文件结构形式。

从上图可以看出:

  1. 一张表可以包含多个HDFS block

  2. 在每个block中,RCFile以行组(row group,类似于ORC中的stripe)为单位存储其中的数据。所谓行组是指在关系型数据块中,若干条记录组成的一个group。对于一张表来说,row group的大小是固定的。通过HDFS的block大小和row group的大小,能够确定一个block上可以容纳多少个row group。

  3. row group又由三个部分组成,包括一个用于在block中分隔两个row group的16字节的标志区,一个存储row group元数据信息的header,以及实际数据区。表中的实际数据以列为单位进行存储。

2.2 RCFile压缩方式

在存储RCFile时,会对每个row group的metadata header区和data区进行压缩。

在metadata header区中,记录了该row group中有多少记录,每个column总共有多少字节数,以及每个column中每一个field的字节数等信息。对metadata header区,使用RLE(Run Length Encoding)算法来压缩数据。需要读取指定column的记录时,可以根据这个metadata中记录的字节数等信息,很快定位到对应的数据。

对data区的数据压缩时,RCFile文件格式并不会将整个区域一起进行压缩,而是以列为单位进行Gzip压缩,这样的处理方式使得需要读取某些指定列的数据时,其他无关的列不需要进行读取。

2.3 RCFile的数据写入方式

由于目前HDFS只支持在文件末尾追加内容,无法随意修改hdfs文件中的数据。所以在使用RCFile文件的hive表中也只能在文件末尾写入新的记录。在向RCFile写入数据时,

  1. 为了避免频繁的写入操作,RCFile会为每一个column在内存中维持一个对应的column holder。当有记录插入到hive表中时,会把这一条记录的每个字段拆散存入到对应的column holder的末尾。伴随着这个操作的同时,会在metadata header中记录此次操作的相关信息。

  2. 上面的column holder当然是不能无限大的,为此RCFile设定了两个参数,当满足任何一个时,就会把column holder中的数据flush到磁盘上。这两个参数一个是写入记录数,另一个是column holder使用的内存大小。

  3. 记录写入完毕后,RCFile首先会将metadata header进行压缩。然后把每一个column单独进行压缩,最后将压缩好的数据flush到同一个row group中。

2.4 RCFile的数据读取和解压缩方式

当需要从一个row group读取数据时,RCFile并不会将整个row group中的数据都读入到内存中,需要读入的数据只包括metadata header,以及在语句中指定的那些column。

这两部分数据读入到内存中后,首先会将metadata header进行解压缩,并一直保存在内存中。接下来对加载到内存中的column数据,在RCFile中有一个lazy decompression的概念,这个的意思是说,column数据并不会在加载到内存中后马上进行解压缩,而是后续处理中的确需要读取这个column数据时解压缩过程才会执行。比如有一个sql语句,select a,b,c from table where a > 5;首先会对字段a解压缩,如果判断所有记录中没有a > 5的记录,那么字段b和字段c都不必要进行解压缩了。

2.5 RCFile的相关参数

参数 默认值 描述
hive.io.rcfile.record.buffer.size 4194304 设置row group的大小
hive.io.rcfile.record.interval 2147483647 row group中最大记录数

row group默认大小为4MB主要是因为row group不能太大,也不能太小。在Gzip压缩算法中,增大row group的大小能够提升压缩的性能。但是当row group的大小达到某个阈值时,继续增大row group并不能带来压缩性能的提升。并且,以上面的sql语句为例如果一个row group越大,其中保存的记录也就越多,这样该row group中出现a >5的记录的概率就越大,那么就越难使用到lazy decompression这一特性带来的性能提升。并且row group越大,消耗的内存也就越多。

这个大小限制在ORC文件格式中得到了改善。

3. ORCFile

ORC文件格式是从Hive-0.11版本开始的。关于ORC文件格式的 官方文档,以及基于 官方文档的翻译内容这里就不赘述了,有兴趣的可以仔细研究了解一下。本文接下来根据论文《Major Technical Advancements in Apache Hive》中的内容进行深入的研究。

3.1 ORC文件格式

ORC的全称是(Optimized Record Columnar),使用ORC文件格式可以提高hive读、写和处理数据的能力。ORC在RCFile的基础上进行了一定的改进,所以与RCFile相比,具有以下一些优势:

  1. ORC中的特定的序列化与反序列化操作可以使ORC file writer根据数据类型进行写出。

  2. 提供了多种RCFile中没有的indexes,这些indexes可以使ORC的reader很快的读到需要的数据,并且跳过无用数据,这使得ORC文件中的数据可以很快的得到访问。

  3. 由于ORC file writer可以根据数据类型进行写出,所以ORC可以支持复杂的数据结构(比如Map等)。

  4. 除了上面三个理论上就具有的优势之外,ORC的具体实现上还有一些其他的优势,比如ORC的stripe默认大小更大,为ORC writer提供了一个memory manager来管理内存使用情况。

ORC文件结构图

3.2 ORC数据存储方法

在ORC格式的hive表中,记录首先会被横向的切分为多个stripes,然后在每一个stripe内数据以列为单位进行存储,所有列的内容都保存在同一个文件中。每个stripe的默认大小为256MB,相对于RCFile每个4MB的stripe而言,更大的stripe使ORC的数据读取更加高效。

对于复杂数据类型,比如Map,ORC文件会将一个复杂数据类型字段解析成多个子字段。下表中列举了ORC文件中对于复杂数据类型的解析

Data type Chile columns
Array 一个包含所有数组元素的子字段
Map 两个子字段,一个key字段,一个value字段
Struct 每一个属性对应一个子字段
Union 每一个属性对应一个子字段

当字段类型都被解析后,会由这些字段类型组成一个字段树,只有树的叶子节点才会保存表数据,这些叶子节点中的数据形成一个数据流,如上图中的Data Stream。

为了使ORC文件的reader更加高效的读取数据,字段的metadata会保存在Meta Stream中。在字段树中,每一个非叶子节点记录的就是字段的metadata,比如对一个array来说,会记录它的长度。下图根据表的字段类型生成了一个对应的字段树。

字段树结构图

在Hive-0.13中,ORC文件格式只支持读取指定字段,还不支持只读取特殊字段类型中的指定部分。

使用ORC文件格式时,用户可以使用HDFS的每一个block存储ORC文件的一个stripe。对于一个ORC文件来说,stripe的大小一般需要设置得比HDFS的block小,如果不这样的话,一个stripe就会分别在HDFS的多个block上,当读取这种数据时就会发生远程读数据的行为。如果设置stripe的只保存在一个block上的话,如果当前block上的剩余空间不足以存储下一个strpie,ORC的writer接下来会将数据打散保存在block剩余的空间上,直到这个block存满为止。这样,下一个stripe又会从下一个block开始存储。

3.3 索引

在ORC文件中添加索引是为了更加高效的从HDFS读取数据。在ORC文件中使用的是稀疏索引(sparse indexes)。

在ORC文件中主要有两种用途的索引,

一个是数据统计(Data Statistics)索引,

一个是位置指针(Position Pointers)索引。

  • Data Statistics

ORC reader用这个索引来跳过读取不必要的数据,在ORC writer生成ORC文件时会创建这个索引文件。这个索引中统计的信息主要有记录的条数,记录的max, min, sum值,以及对text类型和binary类型字段还会记录其长度。对于复杂数据类型,比如Array, Map, Struct, Union,它们的子字段中也会记录这些统计信息。

在ORC文件中,Data Statistics有三个level。

  1. file level statistics

在ORC文件的末尾会记录文件级别的统计信息,会记录整个文件中columns的统计信息。这些信息主要用于查询的优化,也可以为一些简单的聚合查询比如max, min, sum输出结果。

  1. stripe level statistics

ORC文件会保存每个字段stripe级别的统计信息,ORC reader使用这些统计信息来确定对于一个查询语句来说,需要读入哪些stripe中的记录。比如说某个stripe的字段max(a)=10,min(a)=3,那么当where条件为a >10或者a <3时,那么这个stripe中的所有记录在查询语句执行时不会被读入。

  1. index group level statistics

为了进一步的避免读入不必要的数据,在逻辑上将一个column的index以一个给定的值(默认为10000,可由参数配置)分割为多个index组。以10000条记录为一个组,对数据进行统计。Hive查询引擎会将where条件中的约束传递给ORC reader,这些reader根据组级别的统计信息,过滤掉不必要的数据。如果该值设置的太小,就会保存更多的统计信息,用户需要根据自己数据的特点权衡一个合理的值。

  • Position Pointers

当读取一个ORC文件时,ORC reader需要有两个位置信息才能准确的进行数据读取操作。

  1. metadata streams和data streams中每个group的开始位置

由于每个stripe中有多个group,ORC reader需要知道每个group的metadata streams和data streams的开始位置。图1中右边的虚线代表的就是这种pointer。

  1. stripes的开始位置

由于一个ORC文件可以包含多个stripes,并且一个HDFS block也能包含多个stripes。为了快速定位指定stripe的位置,需要知道每个stripe的开始位置。这些信息会保存在ORC file的File Footer中。如图1中间位置的虚线所示。

3.4 文件压缩

ORC文件使用两级压缩机制,首先将一个数据流使用流式编码器进行编码,然后使用一个可选的压缩器对数据流进行进一步压缩。

一个column可能保存在一个或多个数据流中,可以将数据流划分为以下四种类型:

• Byte Stream

字节流保存一系列的字节数据,不对数据进行编码。

• Run Length Byte Stream

字节长度字节流保存一系列的字节数据,对于相同的字节,保存这个重复值以及该值在字节流中出现的位置。

• Integer Stream

整形数据流保存一系列整形数据。可以对数据量进行字节长度编码以及delta编码。具体使用哪种编码方式需要根据整形流中的子序列模式来确定。

• Bit Field Stream

比特流主要用来保存boolean值组成的序列,一个字节代表一个boolean值,在比特流的底层是用Run Length Byte Stream来实现的。

接下来会以Integer和String类型的字段举例来说明。

(1)Integer

对于一个整形字段,会同时使用一个比特流和整形流。比特流用于标识某个值是否为null,整形流用于保存该整形字段非空记录的整数值。

(2)String

对于一个String类型字段,ORC writer在开始时会检查该字段值中不同的内容数占非空记录总数的百分比不超过0.8的话,就使用字典编码,字段值会保存在一个比特流,一个字节流及两个整形流中。比特流也是用于标识null值的,字节流用于存储字典值,一个整形流用于存储字典中每个词条的长度,另一个整形流用于记录字段值。

如果不能用字典编码,ORC writer会知道这个字段的重复值太少,用字典编码效率不高,ORC writer会使用一个字节流保存String字段的值,然后用一个整形流来保存每个字段的字节长度。

在ORC文件中,在各种数据流的底层,用户可以自选ZLIB, Snappy和LZO压缩方式对数据流进行压缩。编码器一般会将一个数据流压缩成一个个小的压缩单元,在目前的实现中,压缩单元的默认大小是256KB。

3.5 内存管理

当ORC writer写数据时,会将整个stripe保存在内存中。由于stripe的默认值一般比较大,当有多个ORC writer同时写数据时,可能会导致内存不足。为了现在这种并发写时的内存消耗,ORC文件中引入了一个内存管理器。在一个Map或者Reduce任务中内存管理器会设置一个阈值,这个阈值会限制writer使用的总内存大小。当有新的writer需要写出数据时,会向内存管理器注册其大小(一般也就是stripe的大小),当内存管理器接收到的总注册大小超过阈值时,内存管理器会将stripe的实际大小按该writer注册的内存大小与总注册内存大小的比例进行缩小。当有writer关闭时,内存管理器会将其注册的内存从总注册内存中注销。

3.6 参数

参数名 默认值 说明
hive.exec.orc.default.stripe.size 25610241024 stripe的默认大小
hive.exec.orc.default.block.size 25610241024 orc文件在文件系统中的默认block大小,从hive-0.14开始
hive.exec.orc.dictionary.key.size.threshold 0.8 String类型字段使用字典编码的阈值
hive.exec.orc.default.row.index.stride 10000 stripe中的分组大小
hive.exec.orc.default.compress ZLIB ORC文件的默认压缩方式
hive.exec.orc.skip.corrupt.data false 遇到错误数据的处理方式,false直接抛出异常,true则跳过该记录

3.7 ORC文件格式实际案例分析

3.7.1 表结构

库名 + 表名:fileformat.test_orc

字段 类型
category_id string
product_id int
brand_id int
price double
category_id_2 string

在hive中命令 desc formatted fileformat.test_orc; 的结果如下图:

根据上图中的location信息,查看在HDFS上的文件:

3.7.2 查看dump文件

hive提供了一个–orcfiledump参数用于查看HDFS上ORC表格的文件信息,在hive-0.13版本中的使用方法为:


hive --orcfiledump 

其他版本的使用方法可以去官方文档中查找。

下面是命令 hive --orcfiledump /user/hive/warehouse/fileformat.db/test_orc/000000_0 的查询结果

2.7.3 dump文件分析

使用hql语句,统计出各字段的count, min, max, sum信息如下:

字段 COUNT MIN MAX SUM
category_id 1000000 5011 975673 4.0222868968E11
product_id 1000000 968 50997770 27158964508399
brand_id 999130 0 1026427 774991825568
price 1000000 -0.0092 358000.0 1.8953626711045265E8
category_id_2 1000000 5010 5996 5.183530839E9

从dump文件的图片中可以看出,大致分成四个部分:

1、表结构信息

记录整张表的记录数,压缩方式,压缩大小,以及表结构。在表结构部分,ORC将整张表的所有字段构造成一个大的struct结构。对应图1-ORC文件结构图中的Postscript部分。

2、Stripe统计信息

统计当前HDFS文件对应Stripe的信息,包括各个字段的count,min, max, sum信息。对于最外层的Struct,只统计其count值。由于这张表数据量不大,当前HDFS文件中只有一个Stripe。对应图1-ORC文件结构图中的Stripe Footer部分。

3、File统计信息

统计内容和第二部分一致,不过这里统计的整张表的每个字段count, min, max, sum信息。对应图1-ORC文件结构图中的FileFooter部分。

这里我们将dump文件中的统计信息,与各字段实际统计信息作对比。通过与上面表格中各字段统计信息对比,发现对于int类型和double类型的字段,min, max, sum的结果都是匹配的。但是对于string类型的字段,仅仅只有min, max统计结果一致,sum的结果不相同。

4、Stripe详细信息

统计各Stripe的offset,总记录行数等Stripe层次的信息。该Stripe中各字段的Index Data和Row Data,以及每个字段的编码方式。

前面一行Stripe: offset: 3 data: 7847351 rows: 1000000 tail: 132 index: 7936应该也是保存在FileFooter中,后面各个字段统计信息对应图1-ORC文件结构图中的Index Data和Row Data部分。

从dump文件中的数据可以看出,每个字段的ROW_INDEX以及DATA信息是保存在一块连续空间中的,这块文件从offset=3开始。这也说明图1-ORC文件结构图中Row Data区的数据紧随Index Data区数据之后。

Index Data数据统计:

起始位置 字段
3……21 STRUCT
22……1141 category_id
1142……3056 product_id
3057……5135 brand_id
5136……7201 price
7202……7938 category_id_2

Row Data数据统计:

起始位置 字段 描述
7939……59887 category_id 字段对应词条int流
59888……59898 category_id 词条长度int流
59899……60989 category_id 字典词条数据
60990……3525432 product_id 实际数据int流
3525433……3527085 brand_id 标识IF NULL的byte流
3527086……5708142 brand_id 实际数据int流
5708143……7855016 price double类型
7855017……7855212 category_id_2 字段对应词条int流
7855213……7855219 category_id_2 词条长度int流
7855220……7855289 category_id_2 字典词条数据

在ORC文件的int类型和string类型保存时,会有一个byte流用于记录字段的某个记录是否为null,根据统计只有brand_id 字段的count值不足100000条,也就是说除了brand_id 字段之外,其他字段中没有null值。所以在上面Row Data表中,只有brand_id有一个对应的IF NULL标识流。一个String类型,会将词条数据保存在字节流中,然后一个int流记录每个词条的长度,另外一个int流用于指定字段某个记录对应字典词条中的哪一个。

这部分最后记录了每一个字段的存储方式,统计如下

字段 类型 存储方式
STRUCT DIRECT
category_id String DICTIONARY_V2
product_id Int DIRECT_V2
brand_id Int DIRECT_V2
price Double DIRECT
category_id_2 String DICTIONARY_V2

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
hive mapjoin 内存溢出异常 hive mapjoin 内存溢出异常
1. 问题描述在hive中两张表进行join操作时有时会出现如下错误: Total MapReduce jobs = 4 2014-10-22 05:45:06 Starting to launch local task to p
2017-09-05
下一篇 
HDFS 中常见的数据格式 1.面向行的文件格式介绍 HDFS 中常见的数据格式 1.面向行的文件格式介绍
1. 概述hadoop 中的文件格式大致分为面向行与面向列两类 面向行 同一行的数据存储在一起, 即连续存储, 如 SequenceFile、 MapFile、Avro DataFile 都采用面向行的方式存储 如果只需要访问行的一
2017-08-29
  目录