一.安装protoc
链接:http://pan.baidu.com/s/1eSbOZXw 密码:agcd
1.从这里下载 protobuf-2.5.0.tar.gz 和 protoc-2.5.0-win32.zip 两个包。分别解压到各自目录。
2.将protoc-2.5.0-win32中的protoc.exe拷贝到c:\windows\system32中。
3.将proto.exe文件拷贝到解压后的XXX\protobuf-2.5.0\src目录中.
4.进入XXX\protobuf-2.4.1\java 目录 执行maven package命令编辑该包 生成protobuf-java-2.5.0.jar文件(位于target目录中)。
5.进入XXX\protobuf-2.4.1\examples目录,可以看到addressbook.proto文件,
执行命令 protoc –java_out=. addressbook.proto 命令,如果生成com文件夹并在最终生成AddressBookProtos类则说明安装成功。
E:\protobuf-2.5.0\protobuf-2.5.0\examples>protoc –java_out=. addressbook
E:\protobuf-2.5.0\protobuf-2.5.0\examples>protoc –java_out=. test.proto
二、制作proto文件
基本的东西相信大家也了解了,直接步入主题了:
1、限定修饰符介绍 required\optional\repeated
required 必须的字段,如果不赋值就会抛出 com.google.protobuf.UninitializedMessageException: Message missing required fields: …异常
optional 可选字段,没什么好说的就是可有可无咯
repeated 可重复的字段可以用来表示数组,在这里我还小小的纠结了会,搞过去就好了(纠结了好一会才知道protobuf数组怎么定义)。其实定义数组很简单repeated string name=字段号;然后在赋值的刚开始用数组的形式来赋值,会抛出Java.lang.IndexOutOfBoundsException: Index: 0, Size: 0的异常,我在想size为0,也就是说不能这样搞呀,然后看了下源码是com.google.protobuf.LazyStringList name_ = com.google.protobuf.LazyStringArrayList.EMPTY这样的,也就是说这个玩儿就是个集合嘛,所以赋值就用集合那套来搞定好了
2.1 创建新的文件 TopNProto.proto
2.2 添加如下内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| option java_outer_classname = “TopNProtos”;
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message TopNRequest {
required string startDate = 1;
required string endDate = 2;
required string followType = 3;
required string systemName = 4;
required int32 topCount = 5;
}
message TopNResponse {
required string topNUsers = 1;
}
service TopNService {
rpc getTopNUser(TopNRequest)
returns (TopNResponse);
}
|
2.3生成文件
在命令行下 进入文件所在目录输入如下命令
E:\protobuf-2.5.0\protobuf-2.5.0\examples>protoc –java_out=. TopNProto.proto
可以查看生成的文件如下连接所示说明创建成功
三、将protocol 用maven 打包
进入到 protobuf-2.5.0\java 文件夹下 用maven 打包 mvn package
进入到 target 目录下查看是否打包成功
存在 protobuf-java-2.5.0.jar 文件说明打包成功
四、Endpoint server端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
| package com.hnbian.endpoint; import com.alibaba.fastjson.JSON; import com.google.protobuf.*; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.coprocessor.CoprocessorException; import org.apache.hadoop.hbase.coprocessor.CoprocessorService; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.rest.model.StorageClusterStatusModel; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import java.io.IOException; import java.util.*;
public class TopNUsersEndpoint extends TopNProtos.TopNService implements Coprocessor, CoprocessorService { private RegionCoprocessorEnvironment env; @Override public void getTopNUser(RpcController controller, TopNProtos.TopNRequest request, RpcCallback<TopNProtos.TopNResponse> done) { Scan scan = new Scan(); TopNProtos.TopNResponse response = null; int topCount = request.getTopCount(); List<Filter> filterList = new ArrayList<Filter>(); Filter filter1 = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, new BinaryComparator(Bytes.toBytes(request.getStartKey()))); Filter filter2 = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(request.getEndKey()))); filterList.add(filter1); filterList.add(filter2); Filter filter = new FilterList(FilterList.Operator.MUST_PASS_ALL, filterList); scan.setFilter(filter); InternalScanner resultScanner = null; List<User> userList = new ArrayList<User>(); for(int i=0;i<=topCount;i++) userList.add(new User()); try { resultScanner = env.getRegion().getScanner(scan); List<Cell> results = new ArrayList<Cell>(); boolean hasMore = false; do { hasMore = resultScanner.next(results); User user = new User(); Result res = Result.create(results); if(!res.isEmpty()){ user.setUserId(Bytes.toString(res.getValue(Bytes.toBytes("user"),Bytes.toBytes("userId")))); user.setUrl(Bytes.toString(res.getValue(Bytes.toBytes("user"),Bytes.toBytes("url")))); user.setStartTime(Long.parseLong(Bytes.toString(res.getValue(Bytes.toBytes("user"),Bytes.toBytes("startTime"))))); user.setEndTime(Long.parseLong(Bytes.toString(res.getValue(Bytes.toBytes("user"),Bytes.toBytes("endTime"))))); userList.set(topCount,user); Collections.sort(userList,new ComparatorUserByElapsed()); results.clear(); } } while (hasMore); userList.remove(topCount); if (userList.get(1).getElapsed() == 0) userList.clear(); response = TopNProtos.TopNResponse.newBuilder().setTopNUsers(JSON.toJSONString(userList)).build(); } catch (IOException e) { e.printStackTrace(); }finally { close(null,null,resultScanner); } done.run(response); } @Override public void start(CoprocessorEnvironment coprocessorEnvironment) throws IOException { if (coprocessorEnvironment instanceof RegionCoprocessorEnvironment) { this.env = (RegionCoprocessorEnvironment) coprocessorEnvironment; if(null == this.env){ throw new CoprocessorException("this.env was be null ! ! ! ! ! ! ! " ); } } else { throw new CoprocessorException("Must be loaded on a table region!"); } } @Override public void stop(CoprocessorEnvironment coprocessorEnvironment) throws IOException { } @Override public Service getService() { return this; } public void close(HConnection hc, HTableInterface ta, InternalScanner rs){ try { if(null != hc) hc.close(); if(null!=ta) ta.close(); if(null!=rs) rs.close(); }catch (Exception e){ e.printStackTrace(); } } }
|
五、客户端调用代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @Override public List<User> getLogTopN(String startKey, String endKey, int topCount) { List<User> userList = new ArrayList<User>(); try{ hConn = HbaseUtil.gethConn(); table = hConn.getTable(Constants.TRACER_RECORD_TABLE); final TopNProtos.TopNRequest request = TopNProtos.TopNRequest.newBuilder().setStartKey(startKey).setEndKey(endKey).setTopCount(topCount).build(); Map<byte[], String> results = table.coprocessorService( TopNProtos.TopNService.class, null, null, new Batch.Call<TopNProtos.TopNService, String>() { @Override public String call(TopNProtos.TopNService topNService) throws IOException { ServerRpcController controller = new ServerRpcController(); BlockingRpcCallback<TopNProtos.TopNResponse> rpcCallback = new BlockingRpcCallback<TopNProtos.TopNResponse>(); topNService.getTopNUser(controller,request,rpcCallback); TopNProtos.TopNResponse response = rpcCallback.get(); if(controller.failedOnException()){ throw controller.getFailedOn(); } return response.getTopNUsers(); } }); for(String s:results.values()){ System.out.println(s); userList.addAll(JSONArray.parseArray(s,User.class)); } if (userList.size()<topCount) userList.subList(0,topCount-1); }catch (Exception e){ e.printStackTrace(); } catch (Throwable throwable) { throwable.printStackTrace(); } finally { close(hConn,table,null); } return userList; }
|
六、部署与测试
部署:将写好的 endpoint server 端的代码打包 与将 protocol 用 maven 打包 的 jar 一起上传到 hbase lib 目录下
重启 HBase
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| hbase shell
import com.hnbian.endpoint.TopNUsersEndpoint
disable ‘user_record’
alter ‘user_record’ ,METHOD=>’table_att’,’coprocessor’=>’|com.hnbian.endpoint.TopNUsersEndpoint||’
enable ‘user_record’
describe ‘user_record’
alter ‘user_record’, METHOD => ‘table_att_unset’,NAME=>’coprocessor$2′
|