一.安装protoc
链接:http://pan.baidu.com/s/1eSbOZXw 密码:agcd
1.从这里下载 protobuf-2.5.0.tar.gz 和 protoc-2.5.0-win32.zip 两个包。分别解压到各自目录。
2.将protoc-2.5.0-win32中的protoc.exe拷贝到c:\windows\system32中。
3.将proto.exe文件拷贝到解压后的XXX\protobuf-2.5.0\src目录中.
4.进入XXX\protobuf-2.4.1\java 目录 执行maven package命令编辑该包 生成protobuf-java-2.5.0.jar文件(位于target目录中)。
5.进入XXX\protobuf-2.4.1\examples目录,可以看到addressbook.proto文件,
执行命令 protoc –java_out=. addressbook.proto 命令,如果生成com文件夹并在最终生成AddressBookProtos类则说明安装成功。
E:\protobuf-2.5.0\protobuf-2.5.0\examples>protoc –java_out=. addressbook
E:\protobuf-2.5.0\protobuf-2.5.0\examples>protoc –java_out=. test.proto
二、制作proto文件
基本的东西相信大家也了解了,直接步入主题了:
1、限定修饰符介绍 required\optional\repeated
required 必须的字段,如果不赋值就会抛出 com.google.protobuf.UninitializedMessageException: Message missing required fields: …异常
optional 可选字段,没什么好说的就是可有可无咯
repeated 可重复的字段可以用来表示数组,在这里我还小小的纠结了会,搞过去就好了(纠结了好一会才知道protobuf数组怎么定义)。其实定义数组很简单repeated string name=字段号;然后在赋值的刚开始用数组的形式来赋值,会抛出Java.lang.IndexOutOfBoundsException: Index: 0, Size: 0的异常,我在想size为0,也就是说不能这样搞呀,然后看了下源码是com.google.protobuf.LazyStringList name_ = com.google.protobuf.LazyStringArrayList.EMPTY这样的,也就是说这个玩儿就是个集合嘛,所以赋值就用集合那套来搞定好了
2.1 创建新的文件 TopNProto.proto
2.2 添加如下内容
option java_outer_classname = “TopNProtos”;
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message TopNRequest {
required string startDate = 1;
required string endDate = 2;
required string followType = 3;
required string systemName = 4;
required int32 topCount = 5;
}
message TopNResponse {
required string topNUsers = 1;
}
service TopNService {
rpc getTopNUser(TopNRequest)
returns (TopNResponse);
}
2.3生成文件
在命令行下 进入文件所在目录输入如下命令
E:\protobuf-2.5.0\protobuf-2.5.0\examples>protoc –java_out=. TopNProto.proto
可以查看生成的文件如下连接所示说明创建成功
三、将protocol 用maven 打包
进入到 protobuf-2.5.0\java 文件夹下 用maven 打包 mvn package
进入到 target 目录下查看是否打包成功
存在 protobuf-java-2.5.0.jar 文件说明打包成功
四、Endpoint server端代码
package com.hnbian.endpoint;
import com.alibaba.fastjson.JSON;
import com.google.protobuf.*;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.rest.model.StorageClusterStatusModel;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import java.io.IOException;
import java.util.*;
/**
* Created by bianhaonan on 2016/6/17.
*/
public class TopNUsersEndpoint extends TopNProtos.TopNService implements Coprocessor, CoprocessorService {
private RegionCoprocessorEnvironment env;
@Override
public void getTopNUser(RpcController controller, TopNProtos.TopNRequest request, RpcCallback<TopNProtos.TopNResponse> done) {
Scan scan = new Scan();
TopNProtos.TopNResponse response = null;
int topCount = request.getTopCount();
List<Filter> filterList = new ArrayList<Filter>();
Filter filter1 = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, new BinaryComparator(Bytes.toBytes(request.getStartKey())));
Filter filter2 = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(request.getEndKey())));
filterList.add(filter1);
filterList.add(filter2);
Filter filter = new FilterList(FilterList.Operator.MUST_PASS_ALL, filterList);
scan.setFilter(filter);
InternalScanner resultScanner = null;
List<User> userList = new ArrayList<User>();
for(int i=0;i<=topCount;i++) userList.add(new User());
try {
resultScanner = env.getRegion().getScanner(scan);
List<Cell> results = new ArrayList<Cell>();
boolean hasMore = false;
do {
hasMore = resultScanner.next(results);
User user = new User();
Result res = Result.create(results);
if(!res.isEmpty()){
user.setUserId(Bytes.toString(res.getValue(Bytes.toBytes("user"),Bytes.toBytes("userId"))));
user.setUrl(Bytes.toString(res.getValue(Bytes.toBytes("user"),Bytes.toBytes("url"))));
user.setStartTime(Long.parseLong(Bytes.toString(res.getValue(Bytes.toBytes("user"),Bytes.toBytes("startTime")))));
user.setEndTime(Long.parseLong(Bytes.toString(res.getValue(Bytes.toBytes("user"),Bytes.toBytes("endTime")))));
userList.set(topCount,user);
Collections.sort(userList,new ComparatorUserByElapsed());
results.clear();
}
} while (hasMore);
//删除最后一个元素
userList.remove(topCount);
if (userList.get(1).getElapsed() == 0) userList.clear();
response = TopNProtos.TopNResponse.newBuilder().setTopNUsers(JSON.toJSONString(userList)).build();
} catch (IOException e) {
e.printStackTrace();
}finally {
close(null,null,resultScanner);
}
done.run(response);
}
@Override
public void start(CoprocessorEnvironment coprocessorEnvironment) throws IOException {
if (coprocessorEnvironment instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment) coprocessorEnvironment;
if(null == this.env){
throw new CoprocessorException("this.env was be null ! ! ! ! ! ! ! " );
}
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
@Override
public void stop(CoprocessorEnvironment coprocessorEnvironment) throws IOException {
}
@Override
public Service getService() {
return this;
}
public void close(HConnection hc, HTableInterface ta, InternalScanner rs){
try {
if(null != hc) hc.close();
if(null!=ta) ta.close();
if(null!=rs) rs.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
五、客户端调用代码
@Override
public List<User> getLogTopN(String startKey, String endKey, int topCount) {
List<User> userList = new ArrayList<User>();
try{
hConn = HbaseUtil.gethConn();
table = hConn.getTable(Constants.TRACER_RECORD_TABLE);
//HtableDescriptor htd = new HtableDescriptor();
final TopNProtos.TopNRequest request = TopNProtos.TopNRequest.newBuilder().setStartKey(startKey).setEndKey(endKey).setTopCount(topCount).build();
Map<byte[], String> results = table.coprocessorService(//调用协处理器
TopNProtos.TopNService.class, null, null, new Batch.Call<TopNProtos.TopNService, String>() {
@Override
public String call(TopNProtos.TopNService topNService) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<TopNProtos.TopNResponse> rpcCallback = new BlockingRpcCallback<TopNProtos.TopNResponse>();
topNService.getTopNUser(controller,request,rpcCallback);
TopNProtos.TopNResponse response = rpcCallback.get();
if(controller.failedOnException()){
throw controller.getFailedOn();
}
return response.getTopNUsers();
}
});
for(String s:results.values()){
System.out.println(s);
userList.addAll(JSONArray.parseArray(s,User.class));
}
if (userList.size()<topCount) userList.subList(0,topCount-1);
}catch (Exception e){
e.printStackTrace();
} catch (Throwable throwable) {
throwable.printStackTrace();
} finally {
close(hConn,table,null);
}
return userList;
}
六、部署与测试
部署:将写好的 endpoint server 端的代码打包 与将 protocol 用 maven 打包 的 jar 一起上传到 hbase lib 目录下
重启 HBase
hbase shell
import com.hnbian.endpoint.TopNUsersEndpoint
#向表中添加Hbase协处理器
#首先关闭表
disable ‘user_record’
#向表中添加协处理器
alter ‘user_record’ ,METHOD=>’table_att’,’coprocessor’=>’|com.hnbian.endpoint.TopNUsersEndpoint||’
#启用表
enable ‘user_record’
#查看是否安装成功
describe ‘user_record’
#删除协处理器 这里的NAME=>’?’需要根据自己的协处理器名称去修改
alter ‘user_record’, METHOD => ‘table_att_unset’,NAME=>’coprocessor$2′