SparkSQL读取HBase数据


介绍

这里的 SparkSQL 是指整合了 Hive 的 spark-sql cli,本质上就是通过Hive访问HBase表,具体就是通过hive-hbase-handler。

环境说明

hadoop-2.3.0-cdh5.0.0

apache-hive-0.13.1-bin

spark-1.4.0-bin-hadoop2.3

hbase-0.96.1.1-cdh5.0.0

测试集群,将Spark Worker部署在每台DataNode上,是为了最大程度的任务本地化,Spark集群为Standalone模式部署。

其中有三台机器上也部署了RegionServer。

这个部署情况对理解后面提到的任务本地化调度有帮助。

增加配置

  1. 拷贝以下HBase的相关jar包到Spark Master和每个Spark Worker节点上的$SPARK_HOME/lib目录下
$HBASE_HOME/lib/hbase-client-0.96.1.1-cdh5.0.0.jar
$HBASE_HOME/lib/hbase-common-0.96.1.1-cdh5.0.0.jar
$HBASE_HOME/lib/hbase-protocol-0.96.1.1-cdh5.0.0.jar
$HBASE_HOME/lib/hbase-server-0.96.1.1-cdh5.0.0.jar
$HBASE_HOME/lib/htrace-core-2.01.jar
$HBASE_HOME/lib/protobuf-java-2.5.0.jar
$HBASE_HOME/lib/guava-12.0.1.jar
$HIVE_HOME/lib/hive-hbase-handler-0.13.1.jar
  1. 配置每个节点上的$SPARK_HOME/conf/spark-env.sh,将上面的jar包添加到SPARK_CLASSPATH
export SPARK_CLASSPATH=$SPARK_HOME/lib/hbase-client-0.96.1.1-cdh5.0.0.jar:
$SPARK_HOME/lib/hbase-common-0.96.1.1-cdh5.0.0.jar:
$SPARK_HOME/lib/hbase-protocol-0.96.1.1-cdh5.0.0.jar:
$SPARK_HOME/lib/hbase-server-0.96.1.1-cdh5.0.0.jar:
$SPARK_HOME/lib/htrace-core-2.01.jar:
$SPARK_HOME/lib/protobuf-java-2.5.0.jar:
$SPARK_HOME/lib/guava-12.0.1.jar:
$SPARK_HOME/lib/hive-hbase-handler-0.13.1.jar:
${SPARK_CLASSPATH}
  1. 将hbase-site.xml拷贝至 ${HADOOP_CONF_DIR} ,由于 spark-env.sh 中配置了 Hadoop 配置文件目录 ${HADOOP_CONF_DIR} ,因此会将 hbase-site.xml 加载。

hbase-site.xml中主要是以下几个参数的配置:

<property>

<name>hbase.zookeeper.quorum</name>

<value>zkNode1:2181,zkNode2:2181,zkNode3:2181</value>

<description>HBase使用的zookeeper节点</description>

</property>

<property>

<name>hbase.client.scanner.caching</name>

<value>5000</value>

<description>HBase客户端扫描缓存,对查询性能有很大帮助</description>

</property>

另外还有一个参数:zookeeper.znode.parent=/hbase 是HBase在zk中的根目录,默认为/hbase,视实际情况进行配置。

  1. 重启Spark集群。

查询数据

hbase中有表tab_info , 数据如下:

hbase(main):025:0* scan 'tab_info'
ROW COLUMN+CELL
tab_info.com column=f1:c1, timestamp=1435624625198, value=name1
tab_info.com column=f1:c2, timestamp=1435624591717, value=name2
tab_info.com column=f2:c1, timestamp=1435624608759, value=age1
tab_info.com column=f2:c2, timestamp=1435624635261, value=age2
tab_info.com column=f3:c1, timestamp=1435624662282, value=job1
tab_info.com column=f3:c2, timestamp=1435624697028, value=job2
tab_info.com column=f3:c3, timestamp=1435624697065, value=job3
1 row(s) in 0.0350 seconds

进入spark-sql,使用如下语句建表:

CREATE EXTERNAL TABLE tab_info (  
rowkey string,  
f1 map<STRING,STRING>,  
f2 map<STRING,STRING>,  
f3 map<STRING,STRING>  
) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'  
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,f1:,f2:,f3:")  
TBLPROPERTIES ("hbase.table.name" = "tab_info");

建好之后,就可以查询了:

spark-sql> select * from tab_info;
tab_info.com     {"c1":"name1","c2":"name2"}     {"c1":"age1","c2":"age2"}       {"c1":"job1","c2":"job2","c3":"job3"}
Time taken: 4.726 seconds, Fetched 1 row(s)
spark-sql> select count(1) from tab_info;
1
Time taken: 2.46 seconds, Fetched 1 row(s)
spark-sql> 

大表查询,消耗的时间和通过Hive用MapReduce查询差不多。

spark-sql> select count(1) from tab_info_hbase;
53609638                                                                        
Time taken: 335.474 seconds, Fetched 1 row(s)

在spark-sql中通过insert插入数据到HBase表时候报错:

INSERT INTO TABLE tab_info
SELECT 'row1' AS rowkey,
map('c3','name3') AS f1,
map('c3','age3') AS f2,
map('c4','job3') AS f3
FROM tab_info_a
limit 1;
  • 报错信息
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 4 times, 
most recent failure: Lost task 0.3 in stage 10.0 (TID 23, slave013.uniclick.cloud): 
java.lang.ClassCastException: org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat cannot be cast to org.apache.hadoop.hive.ql.io.HiveOutputFormat
at org.apache.spark.sql.hive.SparkHiveWriterContainer.outputFormat$lzycompute(hiveWriterContainers.scala:74)
at org.apache.spark.sql.hive.SparkHiveWriterContainer.outputFormat(hiveWriterContainers.scala:73)
at org.apache.spark.sql.hive.SparkHiveWriterContainer.getOutputName(hiveWriterContainers.scala:93)
at org.apache.spark.sql.hive.SparkHiveWriterContainer.initWriters(hiveWriterContainers.scala:117)
at org.apache.spark.sql.hive.SparkHiveWriterContainer.executorSideSetup(hiveWriterContainers.scala:86)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:99)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:83)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:83)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  • 解决办法

    问题分析

    这个问题是由于尝试将 HiveHBaseTableOutputFormat 强制转换为 HiveOutputFormat 引起的,这两个类并不兼容,所以会抛出 ClassCastException 异常。

    为了解决这个问题,你需要检查你的代码中是否存在类似下面的代码:

HiveOutputFormat outputFormat = (HiveOutputFormat) new HiveHBaseTableOutputFormat();

// 如果是这样,将其改为:

HiveHBaseTableOutputFormat outputFormat = new HiveHBaseTableOutputFormat();

或者使用 HiveHBaseStorageHandler,如下所示:

CREATE EXTERNAL TABLE hbase_table(key string, value string) STORED BY 'org.apache.hadoop.hive.hbase.HiveHBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:value")
TBLPROPERTIES ("hbase.table.name" = "hbase_table");

说明

Spark和Hadoop MapReduce一样,在任务调度时候都会考虑数据本地化,即”任务向数据靠拢”,尽量将任务分配到数据所在的节点上运行。

基于这点,tab_info_hbase为HBase中的外部表,Spark在解析时候,通过 org.apache.hadoop.hive.hbase.HBaseStorageHandler 获取到表 tab_info_hbase 在HBase中的 region 所在的 RegionServer,所以,在调度任务时候,首先考虑要往这三台节点上分配任务。

表 tab_info_hbase 共有10个region,因此需要10个map task来运行。

每台机器上Worker的实例为2个,每个Worker实例中运行的Executor为1个,因此,每台机器上运行两个Executor.

那么每个节点上各运行2个Executor,总共6个,Spark会在第一时间将这6个Task交给这6个Executor去执行(NODE_LOCAL Tasks)。

剩下4个Task,只能分配给其他Worker上的Executor,这4个Task为ANY Tasks。

后记

通过Hive和spark-sql去访问HBase表,只是为统计分析提供了一定的便捷性,个人觉得性能上的优势并不明显。

可能Spark通过API去读取HBase数据,性能更好些吧,以后再试。

另外,spark-sql有一点好处,就是可以先把HBase中的数据cache到一张内存表中,然后在这张内存表中,

通过SQL去统计分析,那就爽多了。


文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
SparkSQL2.x  常用函数 SparkSQL2.x 常用函数
% expr1 % expr2 - 返回 expr1 除以 expr2 的余数。. 示例 > SELECT 2 % 1.8; 0.2 > SELECT MOD(2, 1.8); 0.2 & expr1 & expr
2018-09-15
下一篇 
Apache Spark 2.0中DataFrames 和 SQL 2 Apache Spark 2.0中DataFrames 和 SQL 2
本文第一部分使用了无类型的 DataFrame API,其中每行都表示一个Row对象。在下面的内容中,我们将使用更新的 DatasetAPI。Dataset 是在 Apache Spark 1.6 中引入的,并已在 Spark 2.0 中使
2018-09-07
  目录