Apache Phoenix的Join操作和优化


1. 数据说明

  • Orders表:
OrderID CustomerID ItemID Quantity Date
1630781 C004 I001 650 09-01-2013
1630782 C003 I006 2500 09-02-2013
1630783 C002 I002 340 09-03-2013
1630784 C004 I006 1260 09-04-2013
1630785 C005 I003 1500 09-05-2013

数据保存到 Orders.csv,内容格式为:

1630781,C004,I001,650,09-01-2013
1630782,C003,I006,2500,09-02-2013
1630783,C002,I002,340,09-03-2013
1630784,C004,I006,1260,09-04-2013
1630785,C005,I003,1500,09-05-2013
  • Customers表:
CustomerID CustomerName Country
C001 Telefunken Germany
C002 Logica Belgium
C003 Salora Oy Finland
C004 Alps Nordic AB Sweden
C005 Deister Electronics Germany
C006 Thales Nederland Netherlands

数据保存到 Customers.csv,内容格式为:

C001,Telefunken,Germany
C002,Logica,Belgium
C003,Salora Oy,Finland
C004,Alps Nordic AB,Sweden
C005,Deister Electronics,Germany
C006,Thales Nederland,Netherlands
  • Items表:
ItemID ItemName Price
I001 BX016 15.96
I002 MU947 20.35
I003 MU3508 9.6
I004 XC7732 55.24
I005 XT0019 12.65
I006 XT2217 12.35

数据保存到 Items.csv,内容格式为:

I001,BX016,15.96
I002,MU947,20.35
I003,MU3508,9.6
I004,XC7732,55.24
I005,XT0019,12.65
I006,XT2217,12.35

2. 创建表

Orders.sql 文件内容为:

create table IF NOT EXISTS Orders (
   OrderID Integer,
   CustomerID Char(4),
   ItemID Char(4),
   Quantity Integer,
   Date Char(10)
   constraint pk Primary key(OrderID)
);

Customers.sql 文件内容为:

create table IF NOT EXISTS Customers (
  CustomerID Char(4),
  CustomerName Varchar(50),
  Country Varchar(50)
  constraint pk Primary key(CustomerID)
);

Items.sql 文件内容为:

create table IF NOT EXISTS Items (
  ItemID Char(4),
  ItemName Char(10),
  Price Decimal(25,2)
  constraint pk Primary key(ItemID)
);
  • 我们将上面的数据导入到创建的表中:
bin/psql.py slave1,slave2,slave3:2181 Orders.sql Orders.csv

bin/psql.py slave1,slave2,slave3:2181 Customers.sql Customers.csv

bin/psql.py slave1,slave2,slave3:2181 Items.sql Items.sql

3. 执行Join

SELECT O.OrderID,C.CustomerName,C.Country, O.Date
FROM Orders AS O
INNER JOIN Customers AS C
ON O.CustomerID = C.CustomerID; 

执行过程以及结果如下:

基于索引的 Join 操作

当执行 join 查询操作时,二级索引能够自动地被利用。如果我们在 Orders 和 Items 上分别创建索引,如下:

CREATE INDEX iOrders ON Orders (ItemID) INCLUDE (CustomerID, Quantity);

CREATE INDEX i2Orders ON Orders(CustomerID) INCLUDE (ItemID, Quantity);

CREATE INDEX iItems ON Items (ItemName) INCLUDE (Price);

如果你创建可变的二级索引,出现如下的错误:

Error: ERROR 1029 (42Y88): Mutable secondary indexes must have the hbase.regionserver.wal.codec property set to org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec in thehbase-sites.xml of every region server. tableName=IORDERS(state=42Y88,code=1029)

那么需要在 RegionServe r每个节点的 hbase-site.xml 中配置参数:

<property>
  <name>hbase.regionserver.wal.codec</name>
  <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>

然后重启 HBase 集群。

执行查询:

SELECT ItemName,sum(Price * Quantity) AS OrderValue
FROM Items
JOIN Orders
ON Items.ItemID = Orders.ItemID
WHERE Orders.CustomerID > 'C002'
GROUP BY ItemName;

查询结果为:

通过 explain 查询执行计划:

explain SELECT ItemName,sum(Price * Quantity) AS OrderValue
FROM Items
JOIN Orders
ON Items.ItemID = Orders.ItemID
WHERE Orders.CustomerID > 'C002'
GROUP BY ItemName;

在这个案例中,可以看到索引 iItems 和索引 i2Orders 都有使用。

GroupedJoins and Derived Tables

Phoenix 也支持复杂的 Join 语法,比如 grouped joins(或子查询),以及 derived-tables(派生表)的join操作。

对于 grouped joins来说,如下:

SELECT O.OrderID,C.CustomerID, I.ItemID

FROM Customers AS C
INNER JOIN(
    Items AS I
    INNER JOIN OrdersAS O
    ON I.ItemID = O.ItemID
) ON C.CustomerID = O.CustomerID;

通过使用一个子查询(derived table)替换 sub join,得到相等的查询:

SELECT 
  S.OrderID,
  C.CustomerID, 
  S.ItemID
FROM Customers AS C
INNER JOIN ( 
    select 
      O.CustomerID,
      O.OrderID,
      I.ItemID
    from Items AS I
    INNER JOIN Orders AS O
    ON I.ItemID = O.ItemID
) S ON C.CustomerID = S.CustomerID;

上面两个查询结果都为:

  • Hash Join vs. Sort-Merge Join

基本的 Hash Join 通常比其他类型的 join 算法更好,但是它也有一些限制,其中最典型的一个特性是关系中的一张表要小到能够加载到内存中。Phoenix 同时支持 Hash Join 和 Sort-Merge Join 去实现快速的 join 操作以及两张大表之间的 join 操作。

Phoenix 目前尽可能地使用 Hash Join 算法,因为通常更快。但是我们可以使用 USE_SORT_MERGE_JOIN 的 hint 在查询中使用 Sort-Merge Join。对于这两种 join 的算法的选择将来会根据表的统计信息自动选择。

  • Foreign Key to Primary Key Join Optimization

通常情况下,一个 join 发生在一个 child 表到一个 parent 表,通过 child 表的外键映射到一个 parent 表的主键。因此代替对 parent 表的全表扫描,Phoenix 将基于 child 表的外键值对 parent 表进行 skip-scan 或 range-scan 扫描。

下面我们举个例子,parent 表为 Employee,child 表为 Patent。

CREATE TABLE Employee (
  Region VARCHAR NOT NULL,
  LocalID VARCHAR NOT NULL,
  Name VARCHAR,
  StartDate DATE,
CONSTRAINT pk PRIMARY KEY (Region, LocalID));
CREATE TABLE Patent (
  PatentID VARCHAR NOT NULL,
  Region VARCHAR,
  LocalID VARCHAR,
  Title VARCHAR,
  Category VARCHAR,
  FileDate DATE,
CONSTRAINT pk PRIMARY KEY (PatentID));
SELECT E.Name, E.Region, P.PCount
FROM Employee AS E
JOIN
  (SELECT Region, LocalID, count(*) AS PCount
  FROM Patent
  WHERE FileDate >= to_date('2000-01-01')
  GROUP BY Region, LocalID) AS P
ON E.Region = P.Region AND E.LocalID =P.LocalID;

上面的查询语句通过Region和LocalID两个join的key对表“Employee”使用skip-scan扫描。下面是查询使用和不使用这个优化的执行时间(“Employee”大概5000000 行,“Patent”大约1000 行记录):

W/O(没有) Optimization W/(有) Optimization
8.1s 0.4s

然而,当考虑到child外键的值完全在parent表的主键空间中,因此使用skip-scan只会慢不会快的。你可以总是通过指定“NO_CHILD_PARENT_OPTIMIZATION”的hint关闭这个优化。将来,通过表的统计信息智能地选择这两种模式。

4. 配置

前面我们提到,使用Hash Join时,前提条件是将关联中的一张表加载到内存中,以便广播到所有的服务器,因此需要考虑RegionServer服务器的堆内存足够大以便容纳较小的表,我们也需要关注一下使用Hash Join的几个关键性的配置参数。

服务器端缓存用于存放哈希表,缓存的大小和生存时间由下面的几个参数控制。

  1. phoenix.query.maxServerCacheBytes

一个relation在被压缩和发送到RegionServer前的最大原始数据大小(bytes)。

如果尝试去序列化一个relation原始数据大小超过这个值的话,会导致MaxServerCacheSizeExceededException错误。

默认值为:104857600

  1. phoenix.query.maxGlobalMemoryPercentage

所有threads使用的堆内存百分比(Runtime.getRuntime().maxMemory())。

默认值为:15

  1. phoenix.coprocessor.maxServerCacheTimeToLiveMs

服务器端缓存的最大生存时间。

当在服务器端出现IO异常(“Could not find hash cache for joinId”),就考虑调整这个参数了。

如果获取到“Earlier hash cache(s) might have expired on servers”告警日志时,可以提升这个参数的值。

默认值为:30000(30s)

尽管有时可以通过修改参数来解决上面提到的一些异常问题,但是强烈建议首先要考虑优化join的查询,大家可以学习下面的优化部分内容。

5. 查询优化

下面是默认 join 的顺序(没有表统计信息的存在),查询的一边作为“smaller”relation并且将被加载到服务器的内存中:

  1. lhs INNER JOIN rhs:rhs将在服务器的内存中建立hash表

  2. lhs LEFT OUTER JOIN rhs:rhs将在服务器的内存中建立hash表

  3. lhs RIGHT OUTER JOIN rhs:lhs将在服务器的内存中建立hash表

对于多个join查询来说,join 的顺序是比较复杂的,你可以使用 explain 来查询真正的执行计划。对于 multiple-inner-join 查询来说,Phoenix 默认应用 star-join 优化,意味着 join 所有右手边的表的同时,leading 的表(即左手边的表)将仅仅被扫描一次。当所有右手边的表的大小超过内存限制时,你可以通过使用 NO_STAR_JOIN 的hint来关闭这个优化。

下面我们来看一下之前的查询示例:

explain SELECT O.OrderID, C.CustomerName, I.ItemName, I.Price,O.Quantity
FROM Orders AS O
INNER JOIN Customers AS C
ON O.CustomerID = C.CustomerID
INNER JOIN Items AS I
ON O.ItemID = I.ItemID;

默认的 Join 顺序为(使用 star-join 优化):

  1. SCAN Customers –> BUILD HASH[0]

    SCAN Items –> BUILD HASH[1]

  2. SCAN Orders JOIN HASH[0], HASH[1] –> Final Resultset

另外,如果我们使用 NO_STAR_JOIN 的 hint,如下:

SELECT /*+ NO_STAR_JOIN*/ O.OrderID, C.CustomerName, I.ItemName, I.Price, O.Quantity
FROM Orders AS O
INNER JOIN Customers AS C
ON O.CustomerID = C.CustomerID
INNER JOIN Items AS I
ON O.ItemID = I.ItemID;

这次的 Join 顺序为:

  1. SCAN Customers –> BUILD HASH[0]

  2. SCAN Orders JOIN HASH[0]; CLOSE HASH[0] –> BUILD HASH[1]

  3. SCAN Items JOIN HASH[1] –> Final Resultset

这里需要说明的是,并不是表的整个数据集都计算到内存占用的,而是只有查询使用的列数据并且过滤后的记录才会在服务器端建立Hash表。


文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Phoenix on HBase 性能测试 Phoenix on HBase 性能测试
1. 目的测试 phoenix 引擎在 hbase 上的数据查询与数据写入的性能,探索 phoenix 的特性和限制。 2. 软件版本phoenix-4.8.0-HBase-1.1 HBase-1.1.6 3. 硬件环境四台配置相同的PC机
2017-07-20
下一篇 
Table 'SYSTEM.CATALOG' was not found, got Table 'SYSTEM.CATALOG' was not found, got
1. 异常日志 Error: Table 'SYSTEM.CATALOG' was not found, got: IP. (state=08000,code=101) org.apache.phoenix.exception.Phoeni
2017-05-20
  目录