1. HBase 介绍 HBase是一个开源的非关系型分布式数据库,它参考了谷歌的BigTable建模,实现的编程语言为Java。它是Apache软件基金会的Hadoop项目的一部分,运行于HDFS文件系统之上,为 Hadoop 提供类似于BigTable 规模的服务。因此,它可以容错地存储海量稀疏的数据。
HBase是一个高可靠、高性能、面向列、可伸缩的分布式数据库,是谷歌BigTable的开源实现,主要用来存储非结构化和半结构化的松散数据。HBase的目标是处理非常庞大的表,可以通过水平扩展的方式,利用廉价计算机集群处理由超过10亿行数据和数百万列元素组成的数据表。
2. HBase集群部署 HBase 部署文档地址:https://www.hnbian.cn/posts/fdea61c2.html#toc-heading-87
3.HBase 架构
上图描述了Hadoop 生态系统中的各层系统,HBase位于结构化存储层,Hadoop HDFS为HBase提供了高可靠性的底层存储支持,Zookeeper为HBase提供了稳定服务和故障转移机制。
此外,Pig和Hive还为HBase提供了高层语言支持,使得在HBase上进行数据统计处理变的非常简单。 Sqoop则为HBase提供了方便的RDBMS数据导入功能,使得传统数据库数据向HBase中迁移变的非常方便。
包含访问HBase的接口,Client维护着一些cache(缓存)来加快对HBase的访问,比如Region的位置信息
保证任何之后,集群中只有一个Hmaster(选举机制Master selete)
存储所有Region的寻址入口
实时监控HRegionServer的状态,将HRegionServer的上线下线信息实时通知给Hmaster
存储HBase的schema(元数据),包括有哪些table,每个table有哪些column family
为HRegionServer分配Region
负责HRegionServer的负载均衡
发现失效的HRegionServer 并重新分配其上的Region(高可靠性)
GFS上的垃圾文件回收
处理schema更新请求(进行ddl的操作)
HRegionServer 维护Hmaster 分配给他的Region,处理对这些Region的IO请求
HRegionServer负责切分在运行中变得过大的Region 可以看到,Client访问HBase上的数据过程并不需要HMaster的参与(寻址访问Zookepper和HRegionServer,读写HRegionServer),HMaster仅仅维护着table和Region的元数据信息,负载很低
按大小分割的,每个表开始只有一个region,随着数据不管插入表,region不断增大,当增大到一个阀值的时候 ,Hregion 会等分为两个新的region,当table中的行不断增多,就会有越来越多的reigon.
HregionServer负责分割region,HMaster负责迁移region
Hregion是HBase中分布式存储和负载均衡的最小单元。最小单元就表示不同的HRegion可以分布在不同的HRegionServer上,但是一个HRegion是不会拆分到多个Server上的
HRegion虽然是分布式存储的最小单元,但并不是存储的最小单元
事实上,HRegion由一个或者多个Store组成,每个store保存一个columns family。每个store又由一个memStore和0至多个StoreFile组成
StoreFile以HFile格式保存在HDFS上
(Hbase:meta
HBase:namespace)
HBase中有两张特殊的table,-ROOT-和.META.
-ROOT-:记录了.META.表的Region信息,-ROOT-只有一个Region
.META.:记录了用户创建的表的Region信息,.META.可以有多个Region
Zookepper中记录了-ROOT-表的location
Client 访问用户数据之前,需要首先访问Zookepper,然后访问-ROOT-表,接着访问.META.表,最后才能找到用户数据的位置去访问
Hbase的体系结构是一个主从式的结构,主节点Hmaster在整个集群当中只有一个在运行,从节点HRegionServer有很多个在运行,主节点Hmaster与从节点HRegionServer实际上指的是不同的物理机器,即有一个机器上面跑的进程是Hmaster,很多机器上面跑的进程是HRegionServer,Hmaster没有单点问题,Hbase集群当中可以启动多个Hmaster,但是通过zookeeper的事件处理机制保证整个集群当中只有一个Hmaster在运行。
Hamster:为 HRegionServer 分配 Region,负责 HRegionServer 的负载均衡,发现失效的 HRegionServer 并重新分配其上的 Region(高可靠性),GFS上的垃圾文件回收,处理schema更新请求(进行ddl的操作)
Client:包含访问 HBase 的接口,Client维护着一些cache(缓存)来加快对HBase的访问,比如 Region 的位置信息
Zookepper(协调者):保证任何时候,集群中只有一个 Hmaster,存储所有 Region 的寻址入口,实时监控 HRegionServer 的状态,将 HRegionServer 的上线下线信息实时通知给 Hmaster,存储HBase的schema(元数据),包括有哪些table,每个table有哪些column family
HRegionServer:内部管理类一系列 HRegion 对象,每个 HRegion 对应 Table 中的一个 Region 。
HRegion:由许多 Store 组成。
Store:是HBase的存储核心,由 MemStore 和 StoreFile 组成,每个Store包含一个MemStore和0到多个StoreFile。 Store 对应 Table 中的一个Column Family的存储,即一个Store 管理一个 Region 上的一个列族(CF)。
数据在写入时,首先写入预写日志(Write Ahead Log),每个HRegionServer服务的所有Region的写操作日志都存储在一个日志文件中。数据并非直接写入HDFS,而是等缓存到一定数量再批量写入,写入完成后在日志中做标记
MemStore:是一个有序的内存缓冲区,用户写入的数据首先放入MemStore,当 MemStore 满了以后Flush成一个 StoreFile(存储是应为HFile),当 StoreFile 数量到一定阈值,触发Compact合并,将多个 StoreFile 合并成一个 StoreFile 。StoreFiles合并后逐步形成越来越大的 StoreFile,当 Region 内所有的StoreFiles(HFile)的总大小超过阈值(hbase.hregion.max.filesize)即触发分裂,把当前的Region Split成两个Region,父Region下线,新Split的两个子Region被HMaster分配到合适的Region Server上,使得原先一个Region的压力分流到2个Region上。
4. HBase 数据存储细节
相当于关系表的主键,每一行数据的唯一标识。字符串、整数、二进制串都可以作为行键。HBase 中的数据按照RowKey排序后存储。访问Hbase table中的行,只有三种方式
1.通过单个row key
2.通过row key 的 range(范围 )
3.全表扫描
Column Family:简称CF,一个表在水平方向可以由一个到多个CF组成。一个CF可以由任意多个Column组成。Column是CF下的一组标签,可以在写数据时任意添加,因此CF支持动态扩展无须预先定义的Column的数量和类型。HBase中表的列非常稀疏,不同行的列个个数和类型都可以不同。此外每个CF都有独立的TTL(生存周期)。可以只对行上锁,对行的操作始终是原始的。
HBase中通过row和columns确定的为一个存贮单元称为cell。每个cell都保存着同一份数据的多个版本,版本通过时间戳来索引
Hbase中的表会切分成若干个region进行存储,当Table随着记录数不断增加而变大后,Table在行的方向上会被切分成多个Region,一个Region由[startkey,endkey) 表示,每个Region会被Master分散到不同的HRegionServer上面进行存储,类似于我的block块会被分散到不同的DataNode节点上面进行存储。下面是Hbase表中的数据与HRegionServer的分布关系。
5. HBase shell 详细内容查看:https://www.hnbian.cn/posts/ec19e895.html
6. HBase API 中的常用类 6.1 Hbaseconfihuration 作用对Hbase进行配置
用法示例
1 2 3 HBaseConfiguration hconfig = new HBaseConfiguration ;Hconfig.set("hbase.zookepper.property.clientPort" ,"2181" );
返回值
函数
描述
Void
addResource(Path file)
通过给定的路径所指的文件来添加资源
void
dear
清空所有已经设置的属性
string
get(String name)
获取属性名对应值
string
getBoolean(String name,boolean defaultValue)
获取为boolean类型的属性值,如果其属性值类型不为boolean,则返回其默认属性值
void
set(String name,String value)
通过属性名来设置值
void
setBoolean(String name,boolean value)
设置boolean类型的属性值
6.2 HBaseAdmin 包名:org.apache.hadoop.hbase.client.HBaseAdmin
作用:提供了一个接口来管理HBase数据库的表信息,它提供的方法包括:创建表,删除表,列出表项,使表无效,以及添加活删除表列族成员等
用法示例
1 2 3 HBaseAdmin admin = new HBaseAdmin (config);admin.disableTable("tableName" );
返回值
函数
描述
void
addColumn(String tableName,HColumnDescnptor column)
向一个已经存在的表添加列
void
checkHBaseAvailable(HBaseConfiguration conf)
静态函数,查看Hbase 是否处于运行状态
void
createTable(HTableDescriptor desc)
创建一个表,同步操作
void
deleteTable(byte[] tableName)
删除一个已经存在的表
void
enableTable(byte[] tableName)
使表处于有效状态
void
disableTable(byte[] tableName)
使表处于无效状态
HTableDescnptor
listTables
列出所有用户控件表项
void
modifyTable(byte[] tableName,HTableDescnptor htd)
修改表的模式,是异步的操作,可能需要话费一定的时间
boolean
tableExists(String tableName)
检查表是否存在
6.3 HtableDescriptor 包名:org.apache.hadoop.hbase.HTableDescriptor
作用:包含了表的名字及其对应的列族
用法示例:
1 2 3 HtableDescriptor htd = new HtableDescriptor ;Htd.addFamily(new HcolumnDescriptor ("family" ));
返回值
函数
描述
void
addFamily(HcolumnDescriptor hcd)
添加一个列族
HcolumnDescriptor
removeFamily(byte[] column)
移除一个列族
Bte[]
getName;
获取表的名字
Byte[]
getValue;
获取属性的名字
void
setValue(String key,String value)
设置属性的值
6.4 HColumnDescriptor 包名:org.apache.hadoop.hbase.HColumnDescriptor
作用:维护着关于列族的信息,比如版本号,压缩设置等,它通常在创建表或者为表太难家列族的时候使用,列族被创建后不能直接修改,只能通过删除然后从新创建的方式,列族被删除的时候,列族里面的数据会同时被删除
用法示例:
1 2 3 4 5 HtableDescriptor htd = new HtableDescriptor ("tableName" );HColumnDescriptor col = new HColumnDescriptor ("content" );Htd.addFamily(col);
返回值
函数
描述
Byte[]
getName
获取列族的名字
Byte[]
getValue(byte[] key)
获取对应属性的值
void
setValue(String key,String value)
设置对应属性的值
6.5 Htable 包名:org.apache.hadoop.hbase.client.HTable
作用:可以用来和HBase表直接通信,此方法对于更新操作来说是非线程安全的
用法示例:
1 2 3 Htable table = new Htable (conf,Bytes.toBytes("tableName" ));ResultScanner scanner = table.getScanner("family" );
返回值
函数
描述
void
checkAndput(byte[] row,byte[] family,byte[] qualifier,byte[] value,Put put)
自动的检查 row/family/qualifier 是否与给定的值匹配
void
Close
释放所有的资源或挂起内部缓冲区中的更新
boolean
Exists(Get get)
获取get 实例所指定的值是否存在于htable的实例中
result
Get(Get get)
获取指定行的某些单元格所对应的值
Byte[][]
getEndKeys
获取当前一打开的表每个区域的结束键值
ResultScanner
getScanner(byte[] family)
获取当前给定列族的scanner实例
Byte[]
getTableName;
获取表名
Static boolean
isTableEnabled(HBaseConfiguration conf,String tableName);
检查表是否有效
void
Put(put put)
向表中添加值
6.6 Put 包名:org.apache.hadoop.hbase.client.Put
作用:用来对表执行单个添加操作
用法示例:
1 2 3 4 5 6 7 HTable table = new HTable (conf,Bytes.toBytes(tablename));Put p = new Put (brow);p.add(family,qualifier,value); table.put(p);
返回值
函数
描述
void
checkAndput(byte[] row,byte[] family,byte[] qualifier,byte[] value,Put put)
自动的检查 row/family/qualifier 是否与给定的值匹配
void
Close
释放所有的资源或挂起内部缓冲区中的更新
boolean
Exists(Get get)
获取get 实例所指定的值是否存在于htable的实例中
result
Get(Get get)
获取指定行的某些单元格所对应的值
Byte[][]
getEndKeys
获取当前一打开的表每个区域的结束键值
ResultScanner
getScanner(byte[] family)
获取当前给定列族的scanner实例
Byte[]
getTableName;
获取表名
Static boolean
isTableEnabled(HBaseConfiguration conf,String tableName);
检查表是否有效
void
Put(put put)
向表中添加值
6.7 Get 包名:org.apache.hadoop.hbase.client.Get
作用:用来获取单个行的相关信息
用法示例:
1 2 3 4 5 HTable table = new HTable (conf, Bytes.toBytes(tablename));Get g = new Get (Bytes.toBytes(row));table.get(g);
返回值
函数
描述
get
addColumn(byte[] family,byte[] qualifier)
获取指定列族和列的修饰符对应的列
Get
addFamily(byte[] family)
通过指定的列族获取其对应的所有列
Get
setTimeRange(Long.minStamp.long maxStamp)
获取指定区间的版本号
Get
setFilter(Filter filter)
当执行get操作时设置服务器端的过滤器
6.8 Result 包名:org.apache.hadoop.hbase.client.Result
作用:存储get 或者scann操作后获取表的单行值,使用此类提供的方法可以直接获取值或者中map接口(key-value对)
返回值
函数
描述
boolean
containsColumn(byte[] family,byte[]qualifier)
检查指定的列是否存在
NavigableMap<byte[],byte[]>
getFamilyMap(byte[] family)
获取对应列族所包含的修饰符与值的键值对
Byte[]
getValue(byte[] family,byte[] qualifier)
获取对应列的最新值
6.9 ResultScanner 包名:org.apache.hadoop.hbase.client.ResultScanner
作用:存储Get或者Scan操作后获取表的单行值。使用此类提供的方法可以直接
获取值或者各种Map结构(key-value对)
返回值
函数
描述
vlid
Close
关闭scanner并释放分配给它的资源
Result
Next
获取下一行的值
6.10 HTablePool 包名:org.apache.hadoop.hbase.client.HTablePool
作用:可以解决HTable存在的线程不安全问题,同时通过维护固定数量的HTable对象,能够在程序运行期间复用这些HTable资源对象
说明:
HTablePool可以自动创建HTable对象,而且对客户端来说使用上是完全透明的,可以避免多线程间数据并发修改问题。
HTablePool中的HTable对象之间是公用Configuration连接的,能够可以减少网络开销。
HTablePool的使用很简单:每次进行操作前,通过HTablePool的getTable方法取得一个HTable对象,然后进行put/get/scan/delete等操作,最后通过
HTablePool的putTable方法将HTable对象放回到HTablePool中。
7. Java 操作 HBase示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 package hbase;import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.HColumnDescriptor;import org.apache.hadoop.hbase.HTableDescriptor;import org.apache.hadoop.hbase.KeyValue;import org.apache.hadoop.hbase.MasterNotRunningException;import org.apache.hadoop.hbase.ZooKeeperConnectionException;import org.apache.hadoop.hbase.client.Get;import org.apache.hadoop.hbase.client.HBaseAdmin;import org.apache.hadoop.hbase.client.HConnection;import org.apache.hadoop.hbase.client.HConnectionManager;import org.apache.hadoop.hbase.client.HTableInterface;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;import org.apache.hadoop.hbase.util.Bytes;public class HbaseConnection { private String rootDir; private String zkServer; private String port; private Configuration conf; private HConnection hConn = null ; private HbaseConnection (String rootDir,String zkServer,String port) { System.out.println("rootDir" +rootDir); System.out.println("zkServer" +zkServer); System.out.println("port" +port); try { this .rootDir = rootDir; this .zkServer = zkServer; this .port = port; conf = HBaseConfiguration.create; conf.set("hbase.rootdir" , rootDir); conf.set("hbase.zookepper.quorum" , zkServer); conf.set("hbase.zookeeper.quorum" , zkServer); conf.set("hbase.zookepper.property.clientPort" , port); } catch (IOException e) { e.printStackTrace; } } private void createTable (String tablename,List<String> cols) { try { HBaseAdmin admin = new HBaseAdmin (conf); if (admin.tableExists(tablename)){ throw new IOException ("table exists" ); }else { HTableDescriptor tableDesc = new HTableDescriptor (tablename); for (String col:cols){ HColumnDescriptor colDesc = new HColumnDescriptor (col); colDesc.setCompressionType(Algorithm.GZ); colDesc.setDataBlockEncoding(DataBlockEncoding.DIFF); tableDesc.addFamily(colDesc); } admin.createTable(tableDesc); } } catch (MasterNotRunningException e) { e.printStackTrace; } catch (ZooKeeperConnectionException e) { e.printStackTrace; } catch (IOException e) { e.printStackTrace; } } private void saveData (String tablename,List<Put> puts) { try { HTableInterface table = hConn.getTable(tablename); table.put(puts); table.setAutoFlush(false ); table.flushCommits; } catch (IOException e) { e.printStackTrace; } } private Result getData (String tablename,String rowKey) { try { HTableInterface table = hConn.getTable(tablename); Get get = new Get (Bytes.toBytes(rowKey)); return table.get(get); } catch (IOException e) { e.printStackTrace; } return null ; } public static void main (String args[]) { String rootDir = "hdfs://ns1/hbase" ; String zkServer="had5,had6,had7" ; String port="2181" ; HbaseConnection conn = new HbaseConnection (rootDir,zkServer,port); Result result = conn.getData("stu2" ,"tom" ); format(result); } public static void format (Result re) { String rowKey = Bytes.toString(re.getRow); KeyValue[] kvs = re.raw; for (KeyValue kv:kvs){ String family = Bytes.toString(kv.getFamily); String qualifier = Bytes.toString(kv.getQualifier); System.out.println(rowKey); System.out.println(family); System.out.println(qualifier); } } }
8. HBase扫描器 8.1扫描器简介 HBase 在扫描数据的时候,使用Scanner表扫描器
Htable 通过Scan实例,调用getScanner(scan) 来获取扫描器.可以配置扫描器起止位,以及其他过滤条件
通过迭代器返回查询结果,使用起来虽然不是很方便,不过并不复杂.
但是这里有一点可能被忽略的地方.就是返回scanner迭代器,每次调用next的获取下一条记录的时候,默认配置下会访问一次RegioServer,这在网络条件不是很好的情况下,对性能是很大的影响.
建议配置扫描器缓存
8.2扫描器缓存 Hbase.client.scanner.caching配置项可以设置Hbase scanner 一次从服务端抓去的数据条数,默认情况下一次是一条
通过将其设置成一个合理的值,可以减少scan的过程中next的开销时间,代价是scanner需要客户端的内存来维持这些被cache的行记录
有三个地方可以进行配置
1.在HBASE的conf配置文件中进行配置
2.通过调用Htable.setScannerCaching(int scannerCaching);进行配置
3.通过调用Scan.setCaching(int caching);//进行配置,1,2,3的优先级越来越高
8.3 代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void hbaseScan (String tablename) { try { HTableInterface table = hConn.getTable(tablename); Scan scan = new Scan ; scan.setCaching(1000 ); ResultScanner rs = table.getScanner(scan); for (Result r:rs){ format(r); } } catch (IOException e) { e.printStackTrace; } }
9. 过滤器 过滤器详细介绍查看文档:https://www.hnbian.cn/posts/6e925cfa.html
10. 协处理器 Hbase作为列式数据库最经常被人诟病的特性包括:
无法轻易建立”二级索引”,难以执行求和,计数,排序等操作.
比如,在小于0.92的版本中,统计表的总行数,需要使用counter方法,执行一次MapReduce job 才能得到,虽然Hbase在数据存储层中集成了MapReduce,能够有效用于表的分布式计算,
然而在很多情况下,做一些简单的相加或者聚合计算的时候,如果直接将计算过程放置在server端,能够减少通讯开销,从而获得很好的性能提升.于是,Hbase在0.92版本之后引入了协处理器(coprocessors),实现一些激动人心的新特性:能够轻易建立二级索引,复杂过滤器以及访问控制等.
Hbase协处理器的灵感来自于jeff dean 09年的演讲 ,它根据该演讲实现了类似于bigtable的协处理器,包括以下特性
1.每个表服务器的任务子表都可以运行代码
2.客户端的高层调用接口(客户端能够直接访问数据表的行地址,多行读写会自动分片成多个并行的RPC调用)
3.提供一个非常灵活的,可用于简历分布式服务的数据模型
4.能够自动化扩展,负载均衡,应用请求路由
Hbase的协处理器灵感来自bigtable,但是实现细节不尽相同。Hbase建立了一个框架,它为用户提供类库和运行时环境,使他们的代码能够在Hbase region server 和master上处理。
协处理器分为两种类型,
1.系统协处理器可以全局导入region server上的所有数据表,
2.表协处理器即是用户可以执行一张表使用协处理器。
协处理器框架为了更好支持其行为灵活性,提供了两种各不同方面的插件。
1.一个是观察者(observer),类似于关系型数据库的触发器。
2.另一个是终端(endpoint),动态的终端有点类似存储过程。
10.1 Observer 观察者的设计意图是允许用户通过插入代码来重载协处理器框架的upcall方法,而具体的事件触发的callback方法有Hbase的核心代码来执行。协处理器框架处理所有的callback调用细节,协处理器自身只需要插入添加或者改变的功能。
以Hbase0.92版本为例,它提供了三种观察者接口:
RegionObserver:提供客户端的数据操纵事件钩子:Get,Put,Delete,Scan等
WALObserver:提供WAL相关操作钩子
MasterObserver:提供DDL-类型的操作钩子,如创建,删除,修改数据表等
这些接口可以同事使用在同一个地方,按照不同优先级顺序执行,用户可以任意基于协处理器实现复杂的Hbase功能层,Hbase有很多种事件可以触发观察者方法,这些事件与方法从Hbase0.92版本起,都会集成在Hbase API中,不过这些API可能会由于各种原因有些改动,不同版本的接口改动较大。
10.1.1 ObServer 使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package hbase;import java.io.IOException;import java.util.List;import org.apache.hadoop.hbase.KeyValue;import org.apache.hadoop.hbase.client.Get;import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;import org.apache.hadoop.hbase.coprocessor.ObserverContext;import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;import org.apache.hadoop.hbase.util.Bytes;public class RegionObserverTest extends BaseRegionObserver { private static byte [] find_rowkey = Bytes.toBytes("101" ); @Override public void preGet (ObserverContext<RegionCoprocessorEnvironment> c, Get get, List<KeyValue> result) throws IOException { if (Bytes.equals(get.getRow, find_rowkey)){ KeyValue kv = new KeyValue (get.getRow,Bytes.toBytes("time" ),Bytes.toBytes("time" ),Bytes.toBytes(System.currentTimeMillis)); result.add(kv); } } }
1 hadoop fs -put basehbaseobserver.jar /
在Hbase shell客户端 添加observer 协处理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 hbase(main):014:0> disable 'persion' 0 row(s) in 1.3640 seconds hbase(main):015:0> alter 'persion' ,'coprocessor' =>'hdfs://ns1/basehbaseobserver.jar|hbase.RegionObserverTest||' Updating all regions with the new schema... 1/1 regions updated. Done. 0 row(s) in 1.1640 seconds hbase(main):016:0> enable 'persion' 0 row(s) in 2.4270 seconds hbase(main):019:0> describe 'persion' DESCRIPTION ENABLED 'persion' , {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs://ns1/basehbaseobserver.jar|hbase.RegionObserverTest||' }, true {NAME => 'fam1' , DATA_BLOCK_ENCODING => 'DIFF' , BLOOMFILTER => 'ROW' , REPLICATION_SCOPE => '0' , COMPRESSION => 'GZ ' , VERSIONS => '1' , TTL => '2147483647' , MIN_VERSIONS => '0' , KEEP_DELETED_CELLS => 'false' , BLOCKSIZE => '65536' , IN_MEMORY => 'false' , BLOCKCACHE => 'true' }, {NAME => 'fam2' , DATA_BLOCK_ENCODING => 'DIFF' , BLOOMFILTER => 'ROW' , REPLICATION_SCOPE => '0' , COMPRESSION => 'GZ' , VERSIONS => '1' , TTL => '2147483647' , MIN_VERSIONS => '0' , KEEP_D ELETED_CELLS => 'false' , BLOCKSIZE => '65536' , IN_MEMORY => 'false' , BLOCKCACHE => 'true' } 1 row(s) in 0.0300 seconds hbase(main):020:0>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package hbase;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.KeyValue;import org.apache.hadoop.hbase.client.Get;import org.apache.hadoop.hbase.client.HConnection;import org.apache.hadoop.hbase.client.HConnectionManager;import org.apache.hadoop.hbase.client.HTableInterface;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.util.Bytes;public class CallObserverTest { private static HConnection hConn = null ; public static void main (String args[]) throws Throwable{ String rootDir = "hdfs://ns1/hbase" ; String zkServer="had5,had6,had7" ; String port="2181" ; Configuration conf = HBaseConfiguration.create; conf.set("hbase.rootdir" , rootDir); conf.set("hbase.zookeeper.quorum" , zkServer); conf.set("hbase.zookepper.property.clientPort" , port); hConn = HConnectionManager.createConnection(conf); HTableInterface table = hConn.getTable("persion" ); Get get = new Get (Bytes.toBytes("101" )); Result re = table.get(get); String rowKey = Bytes.toString(re.getRow); KeyValue[] kvs = re.raw; for (KeyValue kv:kvs){ String family = Bytes.toString(kv.getFamily); String qualifier = Bytes.toString(kv.getQualifier); System.out.println(rowKey); System.out.println(family); System.out.println(qualifier); System.out.println(Bytes.toString(kv.getValue)); } } }
10.2 EndPoint 终端是动态RPC插件接口,它的实现代码被安装在服务器端,从而能够通过Hbase RPC唤醒。客户端类库提供了非常方便的方法来调用这些动态的接口,他们可以在任意时候调用一个终端,它们的实现代码会被目标region远程执行,结果会返回到终端,用户可以结合使用这些强大的插件接口,为Hbase添加全新的特性,终端的使用,如下面流程所示:
定义一个新的protocol接口必须继承CoprocessorProtocol
实现终端接口,该实现会被导入region环境执行。
继承抽象类BaseEndPointCoprocessor
在客户端,终端可以被两个新的Hbase client API调用。
单个region:HTableInterface.coprocessorProxy(Class protocol,byte[] row).
regions区域:HTableInterface.coprocessorExec(Class protocol,byte[] startKey,byte[] endKey,Batch.Call<T,R>callable)
10.2.1 设置 EndPotin 的三种方式
启动全局aggregation,能够操作所有表上的数据,通过修改Hbase-site.xml这个文件来实现,只需要添加如下代码:
1 2 3 4 5 6 7 <property > <name > hbase.coprocessor.user.region.classes</name > <value > org.apache.hadoop.hbase.coprocessor.RowCountEndPoint</value > </property >
启用表aggregation,只对特定的表生效,通过Hbase shell来实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 hbase>disable 'mytable' hbase>alter 'mytable' METHOD=> 'table_att' ,'coprocessor' =>'|org.apache.hadoop.hbase.coprocessor.RowCountEndpoint||' Hbase>enable 'mytable' hbase(main):006:0> disable 'persion' 0 row(s) in 1.4850 seconds hbase(main):007:0> alter 'persion' , 'coprocessor' =>'|org.apache.hadoop.hbase.coprocessor.example.RowCountEndpoint||' Updating all regions with the new schema... 1/1 regions updated. Done. 0 row(s) in 1.4330 seconds hbase(main):008:0> enable 'persion' 0 row(s) in 0.4800 seconds hbase(main):005:0> alter 'persion' , METHOD => 'table_att_unset' ,NAME=>'coprocessor$1' Updating all regions with the new schema... 0/1 regions updated. 1/1 regions updated. Done. 0 row(s) in 3.0840 seconds hbase(main):006:0> describe 'persion'
API调用
1 2 3 HTableDescriptor htd = new HTableDescriptor ("testTable" );Htd.setValue("CORPROCESSOR$1" ,path.toString+"|" +RowCountEndPoint.class.getCanonicalName+"|" +Coprocessor.Priority.USER);
几点说明:
1.协处理器配置的加载顺序:先加载配置文件中定义的协处理器,后加载表描述符中的协处理器
2.COPROCESSOR$ 中定义了加载的顺序
3.协处理器的加载格式:
1 alter 'tablename' ,METHOD=>'table_att' , 'coprocessor1' =>'hdfs:///foo.jar|Com.foo.fooregion.Observer|1001|arg1=1|arg2=2'
10.2.2 Endpoint 代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 endpoint协处理器功能实现代码 import com.google.protobuf.RpcCallback;import com.google.protobuf.RpcController;import com.google.protobuf.Service;import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CellUtil;import org.apache.hadoop.hbase.Coprocessor;import org.apache.hadoop.hbase.CoprocessorEnvironment;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.coprocessor.CoprocessorException;import org.apache.hadoop.hbase.coprocessor.CoprocessorService;import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos.CountRequest;import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos.CountResponse;import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos.CountResponse.Builder;import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos.RowCountService;import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;import org.apache.hadoop.hbase.protobuf.ResponseConverter;import org.apache.hadoop.hbase.regionserver.HRegion;import org.apache.hadoop.hbase.regionserver.InternalScanner;import org.apache.hadoop.hbase.util.Bytes;public class RowCountEndpoint extends ExampleProtos .RowCountService implements Coprocessor , CoprocessorService { private RegionCoprocessorEnvironment env; public Service getService { return this ; } public void getRowCount (RpcController controller, ExampleProtos.CountRequest request, RpcCallback<ExampleProtos.CountResponse> done) { Scan scan = new Scan ; scan.setFilter(new FirstKeyOnlyFilter ); ExampleProtos.CountResponse response = null ; InternalScanner scanner = null ; try { scanner = this .env.getRegion.getScanner(scan); List results = new ArrayList ; boolean hasMore = false ; byte [] lastRow = null ; long count = 0L ; do { hasMore = scanner.next(results); for (Cell kv : results) { byte [] currentRow = CellUtil.cloneRow(kv); if ((lastRow == null ) || (!Bytes.equals(lastRow, currentRow))) { lastRow = currentRow; count += 1L ; } } results.clear; }while (hasMore); response = ExampleProtos.CountResponse.newBuilder.setCount(count).build; } catch (IOException ioe) { ResponseConverter.setControllerException(controller, ioe); } finally { if (scanner != null ) try { scanner.close; } catch (IOException ignored) { } } done.run(response); } public void getKeyValueCount (RpcController controller, ExampleProtos.CountRequest request, RpcCallback<ExampleProtos.CountResponse> done) { ExampleProtos.CountResponse response = null ; InternalScanner scanner = null ; try { scanner = this .env.getRegion.getScanner(new Scan ); List results = new ArrayList ; boolean hasMore = false ; long count = 0L ; do { hasMore = scanner.next(results); for (Cell kv : results) { count += 1L ; } results.clear; }while (hasMore); response = ExampleProtos.CountResponse.newBuilder.setCount(count).build; } catch (IOException ioe) { ResponseConverter.setControllerException(controller, ioe); } finally { if (scanner != null ) try { scanner.close; } catch (IOException ignored) { } } done.run(response); } public void start (CoprocessorEnvironment env) throws IOException { if ((env instanceof RegionCoprocessorEnvironment)) this .env = ((RegionCoprocessorEnvironment)env); else throw new CoprocessorException ("Must be loaded on a table region!" ); } public void stop (CoprocessorEnvironment env) throws IOException { } }
10.2.3 Java 调用 Endpoint 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package hbase;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.coprocessor.Batch;import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos;import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;import org.apache.hadoop.hbase.ipc.ServerRpcController;import java.io.IOException;import java.util.Map;import org.apache.hadoop.conf.Configuration;public class ComproessorRowcount { public static void main (String args[]) throws Throwable{ String rootDir = "hdfs://ns1/hbase" ; String zkServer="had5,had6,had7" ; String port="2181" ; Configuration conf = HBaseConfiguration.create; conf.set("hbase.rootdir" , rootDir); conf.set("hbase.zookeeper.quorum" , zkServer); conf.set("hbase.zookepper.property.clientPort" , port); HTable table = new HTable (conf,"persion" ); final ExampleProtos.CountRequest request = ExampleProtos.CountRequest.getDefaultInstance; Map<byte [],Long> results = table.coprocessorService( ExampleProtos.RowCountService.class, null , null , new Batch .Call<ExampleProtos.RowCountService,Long> { public Long call (ExampleProtos.RowCountService counter) throws IOException{ ServerRpcController controller = new ServerRpcController ; BlockingRpcCallback<ExampleProtos.CountResponse> rpcCallback = new BlockingRpcCallback <ExampleProtos.CountResponse>; counter.getRowCount(controller, request, rpcCallback); ExampleProtos.CountResponse response = rpcCallback.get; if (controller.failedOnException){ throw controller.getFailedOn; } return (response !=null && response.hasCount)? response.getCount : 0 ; } }); Long sum = 0L ; int count = 0 ; for (Long l:results.values){ sum+=l; count ++; } System.out.println("row count==>" +sum); System.out.println("ragin count==>" +count); } }
11. MapReduce on Hbase
可以使用MapReduce的方法操作Hbase数据库
Hadoop MapReduce提供了相关的API,可以与Hbase数据库无缝连接
API link:http://hbase.apache.org/devapidocs/index.html
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 package hbase;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.HConnection;import org.apache.hadoop.hbase.client.HConnectionManager;import org.apache.hadoop.hbase.client.HTableInterface;import org.apache.hadoop.hbase.client.Mutation;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;public class HbaseMR { public static void main (String args[]) throws IOException, ClassNotFoundException, InterruptedException{ String rootDir = "hdfs://ns1/hbase" ; String zkServer="had5,had6,had7" ; String port="2181" ; Configuration conf = HBaseConfiguration.create; conf.set("hbase.rootdir" , rootDir); conf.set("hbase.zookeeper.quorum" , zkServer); conf.set("hbase.zookepper.property.clientPort" , port); Job job = new Job (conf,"mr on hbase" ); job.setJarByClass(HbaseMR.class); Scan sc = new Scan ; sc.setCaching(100 ); TableMapReduceUtil.initTableMapperJob("persion" , sc,HMapper.class, Text.class, Text.class, job); TableMapReduceUtil.initTableReducerJob("result" , HReduce.class, job); job.waitForCompletion(true ); } } class HMapper extends TableMapper <Text,Text>{ @Override protected void map (ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context) throws IOException, InterruptedException { Text k = new Text (Bytes.toString(key.get)); Text v = new Text (value.getValue(Bytes.toBytes("fam1" ), Bytes.toBytes("col1" ))); System.out.println(k); System.out.println(v); context.write(v, k); } } class HReduce extends TableReducer <Text,Text,ImmutableBytesWritable>{ @Override protected void reduce (Text key, Iterable<Text> values, Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException { Put put = new Put (Bytes.toBytes(key.toString)); for (Text value:values){ put.add(Bytes.toBytes("f1" ),Bytes.toBytes(value.toString),Bytes.toBytes(value.toString)); System.out.println(put); } context.write(null , put); } }
12. Hbase 二级索引
MapReduce方案
Coprocessor 方案
solr+hbase 方案
12.1 MapReduce 方案 indexBuilder:利用MR的方式构建Index
优先:并发批量构建index
缺点:不能实时构建index
举例:
原表:
row 1 f1:name zhangsan
row 2 f1:name lisi
row 3 f1:name wangwu
索引表
row zhangsan f1:name 1
row lisi f1:name 2
row wangwu f1:name 3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 package hbase;import java.io.IOException;import java.util.HashMap;import java.util.Map;import java.util.Set;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Mutation;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;import org.apache.hadoop.hbase.mapreduce.TableInputFormat;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.util.GenericOptionsParser;public class IndexBuilderByMR { @SuppressWarnings("deprecation") public static void main (String args[]) throws IOException, ClassNotFoundException, InterruptedException{ String rootDir = "hdfs://ns1/hbase" ; String zkServer="had5,had6,had7" ; String port="2181" ; Configuration conf = HBaseConfiguration.create; conf.set("hbase.rootdir" , rootDir); conf.set("hbase.zookeeper.quorum" , zkServer); conf.set("hbase.zookepper.property.clientPort" , port); String []otherArgs = new GenericOptionsParser (conf, args).getRemainingArgs; if (otherArgs.length<3 ){ System.out.println("参数传递异常" ); System.exit(-1 ); } String tableName = otherArgs[0 ]; String columnFamily = otherArgs[1 ]; conf.set("tableName" , tableName); conf.set("columnFamily" , columnFamily); String []qualifier = new String [otherArgs.length-2 ]; for (int i=0 ;i<qualifier.length;i++){ qualifier[i]=otherArgs[i+2 ]; } conf.setStrings("qualifiers" , qualifier); Job job = new Job (conf,tableName); job.setJarByClass(IndexBuilderByMR.class); job.setMapperClass(HMapper2.class); job.setNumReduceTasks(0 ); job.setInputFormatClass(TableInputFormat.class); job.setOutputFormatClass(MultiTableOutputFormat.class); Scan sc = new Scan ; sc.setCaching(100 ); TableMapReduceUtil.initTableMapperJob(tableName, sc,HMapper2.class, ImmutableBytesWritable.class, Put.class, job); job.waitForCompletion(true ); } } class HMapper2 extends TableMapper <ImmutableBytesWritable,Put>{ Map<byte [],ImmutableBytesWritable> indexes = new HashMap ; private String familyName; @Override protected void map (ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException { Set<byte []> set = indexes.keySet; for (byte [] k :set){ ImmutableBytesWritable indexTableName = indexes.get(k); byte [] val = value.getValue(Bytes.toBytes(familyName), k); if (val !=null ){ Put put = new Put (val); put.add(Bytes.toBytes("f1" ),Bytes.toBytes("id" ),key.get); context.write(indexTableName, put); } } } @Override protected void setup (Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration; String tableName = conf.get("tableName" ); String familyName = conf.get("familyName" ); String qualifiers[] = conf.getStrings("qualifiers" ); for (String q:qualifiers){ indexes.put(Bytes.toBytes(s), new ImmutableBytesWritable (Bytes.toBytes(tableName+"-" +q))); } super .setup(context); } }
12.2 基于solr的实时查询
hbase 提供海量数据存储
solr提供索引与构建查询
Hbase indexer 提供自动化构建索引(从Hbase到solr)