HBase 协处理器 endpoint 使用记录


一.安装protoc

链接:http://pan.baidu.com/s/1eSbOZXw 密码:agcd

1.从这里下载 protobuf-2.5.0.tar.gz 和 protoc-2.5.0-win32.zip 两个包。分别解压到各自目录。

2.将protoc-2.5.0-win32中的protoc.exe拷贝到c:\windows\system32中。

3.将proto.exe文件拷贝到解压后的XXX\protobuf-2.5.0\src目录中.

4.进入XXX\protobuf-2.4.1\java 目录 执行maven package命令编辑该包 生成protobuf-java-2.5.0.jar文件(位于target目录中)。

5.进入XXX\protobuf-2.4.1\examples目录,可以看到addressbook.proto文件,

执行命令 protoc –java_out=. addressbook.proto 命令,如果生成com文件夹并在最终生成AddressBookProtos类则说明安装成功。

E:\protobuf-2.5.0\protobuf-2.5.0\examples>protoc –java_out=. addressbook

E:\protobuf-2.5.0\protobuf-2.5.0\examples>protoc –java_out=. test.proto

二、制作proto文件

基本的东西相信大家也了解了,直接步入主题了:

1、限定修饰符介绍 required\optional\repeated

  1. required 必须的字段,如果不赋值就会抛出 com.google.protobuf.UninitializedMessageException: Message missing required fields: …异常

  2. optional 可选字段,没什么好说的就是可有可无咯

  3. repeated 可重复的字段可以用来表示数组,在这里我还小小的纠结了会,搞过去就好了(纠结了好一会才知道protobuf数组怎么定义)。其实定义数组很简单repeated string name=字段号;然后在赋值的刚开始用数组的形式来赋值,会抛出Java.lang.IndexOutOfBoundsException: Index: 0, Size: 0的异常,我在想size为0,也就是说不能这样搞呀,然后看了下源码是com.google.protobuf.LazyStringList name_ = com.google.protobuf.LazyStringArrayList.EMPTY这样的,也就是说这个玩儿就是个集合嘛,所以赋值就用集合那套来搞定好了

2.1 创建新的文件 TopNProto.proto

2.2 添加如下内容

option java_outer_classname = “TopNProtos”;

option java_generic_services = true;

option java_generate_equals_and_hash = true;

option optimize_for = SPEED;

message TopNRequest {

required string startDate = 1;

required string endDate = 2;

required string followType = 3;

required string systemName = 4;

required int32 topCount = 5;

}

message TopNResponse {

required string topNUsers = 1;

}

service TopNService {

rpc getTopNUser(TopNRequest)

returns (TopNResponse);

}

2.3生成文件

在命令行下 进入文件所在目录输入如下命令

E:\protobuf-2.5.0\protobuf-2.5.0\examples>protoc –java_out=. TopNProto.proto

可以查看生成的文件如下连接所示说明创建成功

三、将protocol 用maven 打包

进入到 protobuf-2.5.0\java 文件夹下 用maven 打包 mvn package

进入到 target 目录下查看是否打包成功

存在 protobuf-java-2.5.0.jar 文件说明打包成功

四、Endpoint server端代码

package com.hnbian.endpoint;

import com.alibaba.fastjson.JSON;
import com.google.protobuf.*;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.rest.model.StorageClusterStatusModel;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

import java.io.IOException;
import java.util.*;

/**
 * Created by bianhaonan on 2016/6/17.
 */
public class TopNUsersEndpoint extends TopNProtos.TopNService implements Coprocessor, CoprocessorService {

    private RegionCoprocessorEnvironment env;

    @Override
    public void getTopNUser(RpcController controller, TopNProtos.TopNRequest request, RpcCallback<TopNProtos.TopNResponse> done) {
        Scan scan = new Scan();
        TopNProtos.TopNResponse response = null;
        int topCount = request.getTopCount();
        List<Filter> filterList = new ArrayList<Filter>();
        Filter filter1 = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, new BinaryComparator(Bytes.toBytes(request.getStartKey())));
        Filter filter2 = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(request.getEndKey())));
        filterList.add(filter1);
        filterList.add(filter2);
        Filter filter = new FilterList(FilterList.Operator.MUST_PASS_ALL, filterList);
        scan.setFilter(filter);
        InternalScanner  resultScanner = null;
        List<User> userList = new ArrayList<User>();
        for(int i=0;i<=topCount;i++) userList.add(new User());
        try {
            resultScanner = env.getRegion().getScanner(scan);
            List<Cell> results = new ArrayList<Cell>();
            boolean hasMore = false;
            do {
                hasMore = resultScanner.next(results);
                User user = new User();
                Result res = Result.create(results);
                if(!res.isEmpty()){
                    user.setUserId(Bytes.toString(res.getValue(Bytes.toBytes("user"),Bytes.toBytes("userId"))));
                    user.setUrl(Bytes.toString(res.getValue(Bytes.toBytes("user"),Bytes.toBytes("url"))));              
                    user.setStartTime(Long.parseLong(Bytes.toString(res.getValue(Bytes.toBytes("user"),Bytes.toBytes("startTime")))));    
                    user.setEndTime(Long.parseLong(Bytes.toString(res.getValue(Bytes.toBytes("user"),Bytes.toBytes("endTime")))));
                    userList.set(topCount,user);
                    Collections.sort(userList,new ComparatorUserByElapsed());
                    results.clear();
                }
            } while (hasMore);
            //删除最后一个元素
            userList.remove(topCount);
            if (userList.get(1).getElapsed() == 0) userList.clear();
            response =  TopNProtos.TopNResponse.newBuilder().setTopNUsers(JSON.toJSONString(userList)).build();
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
           close(null,null,resultScanner);
        }
        done.run(response);
    }

    @Override
    public void start(CoprocessorEnvironment coprocessorEnvironment) throws IOException {

        if (coprocessorEnvironment instanceof RegionCoprocessorEnvironment) {
            this.env = (RegionCoprocessorEnvironment) coprocessorEnvironment;
            if(null == this.env){
                throw new CoprocessorException("this.env was be null ! ! ! ! ! ! ! " );
            }
        } else {
            throw new CoprocessorException("Must be loaded on a table region!");
        }

    }

    @Override
    public void stop(CoprocessorEnvironment coprocessorEnvironment) throws IOException {

    }

    @Override
    public Service getService() {
        return this;
    }

    public void close(HConnection hc, HTableInterface ta, InternalScanner rs){
        try {
            if(null != hc) hc.close();
            if(null!=ta) ta.close();
            if(null!=rs) rs.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

五、客户端调用代码

@Override
public List<User> getLogTopN(String startKey, String endKey, int topCount) {
    List<User>  userList = new ArrayList<User>();
    try{
        hConn = HbaseUtil.gethConn();
        table = hConn.getTable(Constants.TRACER_RECORD_TABLE);
        //HtableDescriptor htd = new HtableDescriptor();
        final TopNProtos.TopNRequest request = TopNProtos.TopNRequest.newBuilder().setStartKey(startKey).setEndKey(endKey).setTopCount(topCount).build();
        Map<byte[], String> results = table.coprocessorService(//调用协处理器
                TopNProtos.TopNService.class, null, null, new Batch.Call<TopNProtos.TopNService, String>() {
                    @Override
                    public String call(TopNProtos.TopNService topNService) throws IOException {
                        ServerRpcController controller = new ServerRpcController();
                        BlockingRpcCallback<TopNProtos.TopNResponse> rpcCallback = new BlockingRpcCallback<TopNProtos.TopNResponse>();
                        topNService.getTopNUser(controller,request,rpcCallback);
                        TopNProtos.TopNResponse response = rpcCallback.get();
                        if(controller.failedOnException()){
                            throw controller.getFailedOn();
                        }
                        return response.getTopNUsers();
                    }
                });
        for(String s:results.values()){
            System.out.println(s);
            userList.addAll(JSONArray.parseArray(s,User.class));
        }
        if (userList.size()<topCount) userList.subList(0,topCount-1);
    }catch (Exception e){
        e.printStackTrace();
    } catch (Throwable throwable) {
        throwable.printStackTrace();
    } finally {
        close(hConn,table,null);
    }
    return userList;
}

六、部署与测试

部署:将写好的 endpoint server 端的代码打包 与将 protocol 用 maven 打包 的 jar 一起上传到 hbase lib 目录下

重启 HBase

hbase shell

import com.hnbian.endpoint.TopNUsersEndpoint

#向表中添加Hbase协处理器

#首先关闭表
disable ‘user_record’

#向表中添加协处理器
alter ‘user_record’ ,METHOD=>’table_att’,’coprocessor’=>|com.hnbian.endpoint.TopNUsersEndpoint||#启用表
enable ‘user_record’

#查看是否安装成功
describe ‘user_record’

#删除协处理器 这里的NAME=>’?’需要根据自己的协处理器名称去修改
alter ‘user_record’, METHOD => ‘table_att_unset’,NAME=>’coprocessor$2

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
HBASE中获取多个版本的值 HBASE中获取多个版本的值
在 HBase 中 一个 row 对应的相同的列只会有一行。使用 scan 或 get 得到都是最新的数据,如果我们对这某一 row 所对应的列进行了更改操作后,并不会多生成一条数据,不会像数据库一样,插入时多生成一条记录,在HBase中对
2016-07-25
下一篇 
org.apache.hadoop.hbase.util.ByteStringer org.apache.hadoop.hbase.util.ByteStringer
1. 问题描述由于 maven 项目中使用的 protobuf-3.0.0 版本和 HBase 中使用的 protobuf-2.5.0 不一致,导致抛出异常 java.lang.NoClassDefFoundError:org.apache
2016-04-20
  目录