一、HBase Coprocessor
HBase协处理器受BigTable协处理器的启发,为用户提供类库和运行时环境,使得代码能够在HBase RegionServer和Master上处理
系统协处理器and表协处理器
observer and Endpoint
系统协处理器:全局加载到RegionServer托管的所有表和Region上
表协处理器:用户可以指定一张表使用协处理器
观察者( Observer ) :类似于关系数据库的触发器
1)RegionObserver:提供客户端的数据操纵事件钩子:Get、Put、Delete、Scan等
2)MasterObserver:提供DDL类型的操作钩子。如创建、删除、修改数据表等
3)WALObserver:提供WAL相关操作钩子
observer应用场景
安全性:例如执行Get或Put操作前,通过preGet或prePut方法检查是否允许该操作
引用完整性约束:HBase并不支持关系型数据库中的引用完整性约束概念,即通常所说的外键。我们可以使用协处理器增强这种约束
二级索引:可以使用协处理器来维持一个二级索引
终端(Endpoint):动态的终端有点像存储过程
Endpoint是动态RPC插件的接口,它的实现代码被安装在服务器端,从而能够通过HBase RPC唤醒
调用接口,它们的实现代码会被目标RegionServer远程执行
典型的案例:一个大Table有几百个Region,需要计算某列的平均值或者总和
二、协处理器实战
实现一个RegionObserver类型的协处理器
1、首先编写pom文件,如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hbase-example</artifactId>
<groupId>com.imooc.bigdata</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hbase-observer-test</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.4</version>
</dependency>
</dependencies>
</project>
2、完整的项目结构:
RegionObserverTest:
package com.imooc.bigdata.hbase.coprocessor.observer;
import java.awt.image.ImagingOpException;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
public class RegionObserverTest extends BaseRegionObserver {
private byte[] columnFamily = Bytes.toBytes("cf");
private byte[] countCol = Bytes.toBytes("countCol");
private byte[] unDeleteCol = Bytes.toBytes("unDeleteCol");
private RegionCoprocessorEnvironment environment;
//regionserver 打开region前执行
@Override
public void start(CoprocessorEnvironment e) throws IOException {
environment = (RegionCoprocessorEnvironment) e;
}
//RegionServer关闭region前调用
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
}
/**
* 1. cf:countCol 进行累加操作。 每次插入的时候都要与之前的值进行相加
*/
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
Durability durability) throws IOException {
if (put.has(columnFamily, countCol)) {
//获取old countcol value
Result rs = e.getEnvironment().getRegion().get(new Get(put.getRow()));
int oldNum = 0;
for (Cell cell : rs.rawCells()) {
if (CellUtil.matchingColumn(cell, columnFamily, countCol)) {
oldNum = Integer.valueOf(Bytes.toString(CellUtil.cloneValue(cell)));
}
}
//获取new countcol value
List<Cell> cells = put.get(columnFamily, countCol);
int newNum = 0;
for (Cell cell : cells) {
if (CellUtil.matchingColumn(cell, columnFamily, countCol)) {
newNum = Integer.valueOf(Bytes.toString(CellUtil.cloneValue(cell)));
}
}
//sum AND update Put实例
put.addColumn(columnFamily, countCol, Bytes.toBytes(String.valueOf(oldNum + newNum)));
}
}
/**
* 2. 不能直接删除unDeleteCol 删除countCol的时候将unDeleteCol一同删除
*/
@Override
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
WALEdit edit,
Durability durability) throws IOException {
//判断是否操作cf列族
List<Cell> cells = delete.getFamilyCellMap().get(columnFamily);
if (cells == null || cells.size() == 0) {
return;
}
boolean deleteFlag = false;
for (Cell cell : cells) {
byte[] qualifier = CellUtil.cloneQualifier(cell);
if (Arrays.equals(qualifier, unDeleteCol)) {
throw new IOException("can not delete unDel column");
}
if (Arrays.equals(qualifier, countCol)) {
deleteFlag = true;
}
}
if (deleteFlag) {
delete.addColumn(columnFamily, unDeleteCol);
}
}
}
实现一个Endpoint类型的协处理器
1、首先编写pom文件,如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hbase-example</artifactId>
<groupId>com.imooc.bigdata</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hbase-endpoint-test</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
</dependencies>
</project>
2、项目结构:
3、GetRowCount:
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: RowCountTest.proto
package com.imooc.bigdata.hbase.coprocessor.endpoint;
public final class GetRowCount {
private GetRowCount() {}
public static void registerAllExtensions(
com.google.protobuf.ExtensionRegistry registry) {
}
public interface getRowCountRequestOrBuilder
extends com.google.protobuf.MessageOrBuilder {
}
/**
* Protobuf type {@code getRowCountRequest}
*/
public static final class getRowCountRequest extends
com.google.protobuf.GeneratedMessage
implements getRowCountRequestOrBuilder {
// Use getRowCountRequest.newBuilder() to construct.
private getRowCountRequest(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
super(builder);
this.unknownFields = builder.getUnknownFields();
}
private getRowCountRequest(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
private static final getRowCountRequest defaultInstance;
public static getRowCountRequest getDefaultInstance() {
return defaultInstance;
}
public getRowCountRequest getDefaultInstanceForType() {
return defaultInstance;
}
private final com.google.protobuf.UnknownFieldSet unknownFields;
@java.lang.Override
public final com.google.protobuf.UnknownFieldSet
getUnknownFields() {
return this.unknownFields;
}
private getRowCountRequest(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
initFields();
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder();
try {
boolean done = false;
while (!done) {
int tag = input.readTag();
switch (tag) {
case 0:
done = true;
break;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
done = true;
}
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
throw e.setUnfinishedMessage(this);
} catch (java.io.IOException e) {
throw new com.google.protobuf.InvalidProtocolBufferException(
e.getMessage()).setUnfinishedMessage(this);
} finally {
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.internal_static_getRowCountRequest_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.internal_static_getRowCountRequest_fieldAccessorTable
.ensureFieldAccessorsInitialized(
com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest.class, com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest.Builder.class);
}
public static com.google.protobuf.Parser<getRowCountRequest> PARSER =
new com.google.protobuf.AbstractParser<getRowCountRequest>() {
public getRowCountRequest parsePartialFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return new getRowCountRequest(input, extensionRegistry);
}
};
@java.lang.Override
public com.google.protobuf.Parser<getRowCountRequest> getParserForType() {
return PARSER;
}
private void initFields() {
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
memoizedIsInitialized = 1;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
private static final long serialVersionUID = 0L;
@java.lang.Override
protected java.lang.Object writeReplace()
throws java.io.ObjectStreamException {
return super.writeReplace();
}
public static com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest parseFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
public static com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
public static com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input);
}
public static com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input, extensionRegistry);
}
public static com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
public static com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@java.lang.Override
protected Builder newBuilderForType(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
/**
* Protobuf type {@code getRowCountRequest}
*/
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder>
implements com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequestOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.internal_static_getRowCountRequest_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.internal_static_getRowCountRequest_fieldAccessorTable
.ensureFieldAccessorsInitialized(
com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest.class, com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest.Builder.class);
}
// Construct using com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
}
}
private static Builder create() {
return new Builder();
}
public Builder clear() {
super.clear();
return this;
}
public Builder clone() {
return create().mergeFrom(buildPartial());
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.internal_static_getRowCountRequest_descriptor;
}
public com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest getDefaultInstanceForType() {
return com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest.getDefaultInstance();
}
public com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest build() {
com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
public com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest buildPartial() {
com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest result = new com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest(this);
onBuilt();
return result;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest) {
return mergeFrom((com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest other) {
if (other == com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest.getDefaultInstance()) return this;
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public final boolean isInitialized() {
return true;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
parsedMessage = (com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest) e.getUnfinishedMessage();
throw e;
} finally {
if (parsedMessage != null) {
mergeFrom(parsedMessage);
}
}
return this;
}
// @@protoc_insertion_point(builder_scope:getRowCountRequest)
}
static {
defaultInstance = new getRowCountRequest(true);
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:getRowCountRequest)
}
public interface getRowCountResponseOrBuilder
extends com.google.protobuf.MessageOrBuilder {
// optional int64 rowCount = 1;
/**
* <code>optional int64 rowCount = 1;</code>
*/
boolean hasRowCount();
/**
* <code>optional int64 rowCount = 1;</code>
*/
long getRowCount();
}
/**
* Protobuf type {@code getRowCountResponse}
*/
public static final class getRowCountResponse extends
com.google.protobuf.GeneratedMessage
implements getRowCountResponseOrBuilder {
// Use getRowCountResponse.newBuilder() to construct.
private getRowCountResponse(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
super(builder);
this.unknownFields = builder.getUnknownFields();
}
private getRowCountResponse(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
private static final getRowCountResponse defaultInstance;
public static getRowCountResponse getDefaultInstance() {
return defaultInstance;
}
public getRowCountResponse getDefaultInstanceForType() {
return defaultInstance;
}
private final com.google.protobuf.UnknownFieldSet unknownFields;
@java.lang.Override
public final com.google.protobuf.UnknownFieldSet
getUnknownFields() {
return this.unknownFields;
}
private getRowCountResponse(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
initFields();
int mutable_bitField0_ = 0;
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder();
try {
boolean done = false;
while (!done) {
int tag = input.readTag();
switch (tag) {
case 0:
done = true;
break;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
done = true;
}
break;
}
case 8: {
bitField0_ |= 0x00000001;
rowCount_ = input.readInt64();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
throw e.setUnfinishedMessage(this);
} catch (java.io.IOException e) {
throw new com.google.protobuf.InvalidProtocolBufferException(
e.getMessage()).setUnfinishedMessage(this);
} finally {
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.internal_static_getRowCountResponse_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.internal_static_getRowCountResponse_fieldAccessorTable
.ensureFieldAccessorsInitialized(
com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse.class, com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse.Builder.class);
}
public static com.google.protobuf.Parser<getRowCountResponse> PARSER =
new com.google.protobuf.AbstractParser<getRowCountResponse>() {
public getRowCountResponse parsePartialFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return new getRowCountResponse(input, extensionRegistry);
}
};
@java.lang.Override
public com.google.protobuf.Parser<getRowCountResponse> getParserForType() {
return PARSER;
}
private int bitField0_;
// optional int64 rowCount = 1;
public static final int ROWCOUNT_FIELD_NUMBER = 1;
private long rowCount_;
/**
* <code>optional int64 rowCount = 1;</code>
*/
public boolean hasRowCount() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>optional int64 rowCount = 1;</code>
*/
public long getRowCount() {
return rowCount_;
}
private void initFields() {
rowCount_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
memoizedIsInitialized = 1;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeInt64(1, rowCount_);
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += com.google.protobuf.CodedOutputStream
.computeInt64Size(1, rowCount_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
private static final long serialVersionUID = 0L;
@java.lang.Override
protected java.lang.Object writeReplace()
throws java.io.ObjectStreamException {
return super.writeReplace();
}
public static com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse parseFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
public static com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
public static com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input);
}
public static com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input, extensionRegistry);
}
public static com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
public static com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@java.lang.Override
protected Builder newBuilderForType(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
/**
* Protobuf type {@code getRowCountResponse}
*/
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder>
implements com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponseOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.internal_static_getRowCountResponse_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.internal_static_getRowCountResponse_fieldAccessorTable
.ensureFieldAccessorsInitialized(
com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse.class, com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse.Builder.class);
}
// Construct using com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
}
}
private static Builder create() {
return new Builder();
}
public Builder clear() {
super.clear();
rowCount_ = 0L;
bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
public Builder clone() {
return create().mergeFrom(buildPartial());
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.internal_static_getRowCountResponse_descriptor;
}
public com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse getDefaultInstanceForType() {
return com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse.getDefaultInstance();
}
public com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse build() {
com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
public com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse buildPartial() {
com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse result = new com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
to_bitField0_ |= 0x00000001;
}
result.rowCount_ = rowCount_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse) {
return mergeFrom((com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse other) {
if (other == com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse.getDefaultInstance()) return this;
if (other.hasRowCount()) {
setRowCount(other.getRowCount());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public final boolean isInitialized() {
return true;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
parsedMessage = (com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse) e.getUnfinishedMessage();
throw e;
} finally {
if (parsedMessage != null) {
mergeFrom(parsedMessage);
}
}
return this;
}
private int bitField0_;
// optional int64 rowCount = 1;
private long rowCount_ ;
/**
* <code>optional int64 rowCount = 1;</code>
*/
public boolean hasRowCount() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>optional int64 rowCount = 1;</code>
*/
public long getRowCount() {
return rowCount_;
}
/**
* <code>optional int64 rowCount = 1;</code>
*/
public Builder setRowCount(long value) {
bitField0_ |= 0x00000001;
rowCount_ = value;
onChanged();
return this;
}
/**
* <code>optional int64 rowCount = 1;</code>
*/
public Builder clearRowCount() {
bitField0_ = (bitField0_ & ~0x00000001);
rowCount_ = 0L;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:getRowCountResponse)
}
static {
defaultInstance = new getRowCountResponse(true);
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:getRowCountResponse)
}
/**
* Protobuf service {@code hbaseEndPointTestService}
*/
public static abstract class hbaseEndPointTestService
implements com.google.protobuf.Service {
protected hbaseEndPointTestService() {}
public interface Interface {
/**
* <code>rpc getRowCount(.getRowCountRequest) returns (.getRowCountResponse);</code>
*/
public abstract void getRowCount(
com.google.protobuf.RpcController controller,
com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest request,
com.google.protobuf.RpcCallback<com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse> done);
}
public static com.google.protobuf.Service newReflectiveService(
final Interface impl) {
return new hbaseEndPointTestService() {
@java.lang.Override
public void getRowCount(
com.google.protobuf.RpcController controller,
com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest request,
com.google.protobuf.RpcCallback<com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse> done) {
impl.getRowCount(controller, request, done);
}
};
}
public static com.google.protobuf.BlockingService
newReflectiveBlockingService(final BlockingInterface impl) {
return new com.google.protobuf.BlockingService() {
public final com.google.protobuf.Descriptors.ServiceDescriptor
getDescriptorForType() {
return getDescriptor();
}
public final com.google.protobuf.Message callBlockingMethod(
com.google.protobuf.Descriptors.MethodDescriptor method,
com.google.protobuf.RpcController controller,
com.google.protobuf.Message request)
throws com.google.protobuf.ServiceException {
if (method.getService() != getDescriptor()) {
throw new java.lang.IllegalArgumentException(
"Service.callBlockingMethod() given method descriptor for " +
"wrong service type.");
}
switch(method.getIndex()) {
case 0:
return impl.getRowCount(controller, (com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest)request);
default:
throw new java.lang.AssertionError("Can't get here.");
}
}
public final com.google.protobuf.Message
getRequestPrototype(
com.google.protobuf.Descriptors.MethodDescriptor method) {
if (method.getService() != getDescriptor()) {
throw new java.lang.IllegalArgumentException(
"Service.getRequestPrototype() given method " +
"descriptor for wrong service type.");
}
switch(method.getIndex()) {
case 0:
return com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
}
public final com.google.protobuf.Message
getResponsePrototype(
com.google.protobuf.Descriptors.MethodDescriptor method) {
if (method.getService() != getDescriptor()) {
throw new java.lang.IllegalArgumentException(
"Service.getResponsePrototype() given method " +
"descriptor for wrong service type.");
}
switch(method.getIndex()) {
case 0:
return com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
}
};
}
/**
* <code>rpc getRowCount(.getRowCountRequest) returns (.getRowCountResponse);</code>
*/
public abstract void getRowCount(
com.google.protobuf.RpcController controller,
com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest request,
com.google.protobuf.RpcCallback<com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse> done);
public static final
com.google.protobuf.Descriptors.ServiceDescriptor
getDescriptor() {
return com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getDescriptor().getServices().get(0);
}
public final com.google.protobuf.Descriptors.ServiceDescriptor
getDescriptorForType() {
return getDescriptor();
}
public final void callMethod(
com.google.protobuf.Descriptors.MethodDescriptor method,
com.google.protobuf.RpcController controller,
com.google.protobuf.Message request,
com.google.protobuf.RpcCallback<
com.google.protobuf.Message> done) {
if (method.getService() != getDescriptor()) {
throw new java.lang.IllegalArgumentException(
"Service.callMethod() given method descriptor for wrong " +
"service type.");
}
switch(method.getIndex()) {
case 0:
this.getRowCount(controller, (com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest)request,
com.google.protobuf.RpcUtil.<com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse>specializeCallback(
done));
return;
default:
throw new java.lang.AssertionError("Can't get here.");
}
}
public final com.google.protobuf.Message
getRequestPrototype(
com.google.protobuf.Descriptors.MethodDescriptor method) {
if (method.getService() != getDescriptor()) {
throw new java.lang.IllegalArgumentException(
"Service.getRequestPrototype() given method " +
"descriptor for wrong service type.");
}
switch(method.getIndex()) {
case 0:
return com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
}
public final com.google.protobuf.Message
getResponsePrototype(
com.google.protobuf.Descriptors.MethodDescriptor method) {
if (method.getService() != getDescriptor()) {
throw new java.lang.IllegalArgumentException(
"Service.getResponsePrototype() given method " +
"descriptor for wrong service type.");
}
switch(method.getIndex()) {
case 0:
return com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
}
public static Stub newStub(
com.google.protobuf.RpcChannel channel) {
return new Stub(channel);
}
public static final class Stub extends com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.hbaseEndPointTestService implements Interface {
private Stub(com.google.protobuf.RpcChannel channel) {
this.channel = channel;
}
private final com.google.protobuf.RpcChannel channel;
public com.google.protobuf.RpcChannel getChannel() {
return channel;
}
public void getRowCount(
com.google.protobuf.RpcController controller,
com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest request,
com.google.protobuf.RpcCallback<com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse> done) {
channel.callMethod(
getDescriptor().getMethods().get(0),
controller,
request,
com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse.getDefaultInstance(),
com.google.protobuf.RpcUtil.generalizeCallback(
done,
com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse.class,
com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse.getDefaultInstance()));
}
}
public static BlockingInterface newBlockingStub(
com.google.protobuf.BlockingRpcChannel channel) {
return new BlockingStub(channel);
}
public interface BlockingInterface {
public com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse getRowCount(
com.google.protobuf.RpcController controller,
com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest request)
throws com.google.protobuf.ServiceException;
}
private static final class BlockingStub implements BlockingInterface {
private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) {
this.channel = channel;
}
private final com.google.protobuf.BlockingRpcChannel channel;
public com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse getRowCount(
com.google.protobuf.RpcController controller,
com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest request)
throws com.google.protobuf.ServiceException {
return (com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse) channel.callBlockingMethod(
getDescriptor().getMethods().get(0),
controller,
request,
com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse.getDefaultInstance());
}
}
// @@protoc_insertion_point(class_scope:hbaseEndPointTestService)
}
private static com.google.protobuf.Descriptors.Descriptor
internal_static_getRowCountRequest_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_getRowCountRequest_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor
internal_static_getRowCountResponse_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_getRowCountResponse_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
return descriptor;
}
private static com.google.protobuf.Descriptors.FileDescriptor
descriptor;
static {
java.lang.String[] descriptorData = {
"\n\022RowCountTest.proto\"\024\n\022getRowCountReque" +
"st\"\'\n\023getRowCountResponse\022\020\n\010rowCount\030\001 " +
"\001(\0032T\n\030hbaseEndPointTestService\0228\n\013getRo" +
"wCount\022\023.getRowCountRequest\032\024.getRowCoun" +
"tResponseB@\n,com.imooc.bigdata.hbase.cop" +
"rocessor.endpointB\013GetRowCountH\001\210\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
public com.google.protobuf.ExtensionRegistry assignDescriptors(
com.google.protobuf.Descriptors.FileDescriptor root) {
descriptor = root;
internal_static_getRowCountRequest_descriptor =
getDescriptor().getMessageTypes().get(0);
internal_static_getRowCountRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_getRowCountRequest_descriptor,
new java.lang.String[] { });
internal_static_getRowCountResponse_descriptor =
getDescriptor().getMessageTypes().get(1);
internal_static_getRowCountResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_getRowCountResponse_descriptor,
new java.lang.String[] { "RowCount", });
return null;
}
};
com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
new com.google.protobuf.Descriptors.FileDescriptor[] {
}, assigner);
}
// @@protoc_insertion_point(outer_class_scope)
}
4、TestRowCountEndPoint:
package com.imooc.bigdata.hbase.coprocessor.endpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountRequest;
import com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse;
import com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.hbaseEndPointTestService;
public class TestRowCountEndPoint extends
hbaseEndPointTestService implements Coprocessor,
CoprocessorService {
// 单个region的上下文环境信息
private RegionCoprocessorEnvironment envi;
// rpc服务,返回本身即可,因为此类实例就是一个服务实现
@Override
public Service getService() {
return this;
}
// 协处理器是运行于region中的,每一个region都会加载协处理器
// 这个方法会在regionserver打开region时候执行(还没有真正打开)
@Override
public void start(CoprocessorEnvironment env) throws IOException {
// 需要检查当前环境是否在region上
if (env instanceof RegionCoprocessorEnvironment) {
this.envi = (RegionCoprocessorEnvironment) env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
// 这个方法会在regionserver关闭region时候执行(还没有真正关闭)
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
// nothing to do
}
// 服务端(每一个region上)的接口实现方法
// 第一个参数是固定的,其余的request参数和response参数是proto接口文件中指明的。
@Override
public void getRowCount(RpcController controller,
getRowCountRequest request,
RpcCallback<getRowCountResponse> done) {
// 单个region上的计算结果值
int result = 0;
// 定义返回response
getRowCountResponse.Builder responseBuilder = getRowCountResponse.newBuilder();
// 进行行数统计
InternalScanner scanner = null;
try {
Scan scan = new Scan();
scanner = this.envi.getRegion().getScanner(scan);
List<Cell> results = new ArrayList<Cell>();
boolean hasMore = false;
do {
hasMore = scanner.next(results);
result++;
} while (hasMore);
} catch (IOException ioe) {
ioe.printStackTrace();
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {
// nothing to do
}
}
}
responseBuilder.setRowCount(result);
done.run(responseBuilder.build());
return;
}
}
5、RowCountTest.proto:
option java_package = "com.imooc.bigdata.hbase.coprocessor.endpoint";
option java_outer_classname = "GetRowCount";
option java_generic_services = true;
option optimize_for = SPEED;
message getRowCountRequest{
}
message getRowCountResponse {
optional int64 rowCount = 1;
}
service hbaseEndPointTestService {
rpc getRowCount(getRowCountRequest)
returns(getRowCountResponse);
}
6、TestRowCount:
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount;
import com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.getRowCountResponse;
import com.imooc.bigdata.hbase.coprocessor.endpoint.GetRowCount.hbaseEndPointTestService;
public class TestRowCount {
public static void main(String[] args) throws Throwable {
Configuration config = new Configuration();
config.set("hbase.zookeeper.quorum", "127.0.0.1:2181");
Table table = null;
Connection connection = null;
// 返回值接收,Map<region名称,计算结果>
Map<byte[], getRowCountResponse> results = null;
try {
connection = ConnectionFactory.createConnection(config);
table = connection.getTable(TableName.valueOf("test_table"));
// 调用 RPC,并对返回值进行处理。
Batch.Call<hbaseEndPointTestService, getRowCountResponse> callable = new Batch.Call<hbaseEndPointTestService, getRowCountResponse>() {
ServerRpcController controller = new ServerRpcController();
// 定义返回
BlockingRpcCallback<getRowCountResponse> rpcCallback = new BlockingRpcCallback<getRowCountResponse>();
// 下面重载 call 方法,API会连接到region后会运行call方法来执行服务的请求
@Override
public getRowCountResponse call(hbaseEndPointTestService instance)
throws IOException {
// Server 端会进行慢速的遍历 region 的方法进行统计
GetRowCount.getRowCountRequest.Builder request = GetRowCount.getRowCountRequest
.newBuilder();
// RPC 接口方法调用
instance.getRowCount(controller, request.build(),
rpcCallback);
// 直接返回结果,即该 Region 的 计算结果
return rpcCallback.get();
}
};
/**
* 通过Table.coprocessorService(Class, byte[], byte[],Batch.Call)
* 请求多region服务
*
* byte[]参数指明了startRowKey和endRowKey,当都为null的时候即进行全表的全region的数据计算。
* * Batch.Call:需要自定义,API会根据如上参数信息并行的连接各个region,
*/
results = table.coprocessorService(hbaseEndPointTestService.class,
null, null,
callable);
// 合并多个Region的结果
Collection<getRowCountResponse> resultsc = results.values();
long result = 0L;
for (getRowCountResponse r : resultsc) {
result += r.getRowCount();
}
System.out.println(String.format("row count=%s", result));
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != table) {
try {
table.close();
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
}
}
}
7、加载协处理器
配置文件加载:即通过hbase-site.xml文件配置加载,一般这样的协处理器是系统级别的
属性 | 说明 |
hbase.coprocessor.region.classes | 配置RegionObservers和Endpoints |
hbase.coprocessor.wal.classes | 配置WALObservers |
hbase.coprocessor.master.classes | 配置MasterObservers |
shell加载:可以通过alter命令来对表进行schema修改来加载协处理器
<property>
<name>hbase.coprocessor.region.classes</name>
<value>org.myname.hbase.coprocessor.endpoint.SumEndPoint</value>
</property>
通过API代码加载:即通过API的方式来加载协处理器
alter 'CoprocessorTestTable',
'coprocessor'=>'hdfs://localhost:9000/path/coprocessor.jarlcom.imooc.hbase.coprocessor.TestRegionObserver | 1001|'
8、编译hbase-observer-test,生成hbase-observer-test-1.0-SNAPSHOT.jar,如图:
将hbase-observer-test-1.0-SNAPSHOT.jar上传到/usr/local/hadoop上
在/usr/local/hadoop/hadoop-2.7.3/bin 上使用
./hdfs dfs -mkdir /coprocessor/
./hdfs dfs -copyFromLocal /usr/local/hadoop/hbase-observer-test-1.0-SNAPSHOT.jar /coprocessor/
cd /usr/local/hadoop/hbase-1.2.4/bin 进入该目录
./hbase shell
create 'CoprocessorTest','cf'
disable 'CoprocessorTest' 先将表禁用掉,再将协处理器加载到该表中
执行加载协处理器
alter 'CoprocessorTest','coprocessor'=>'hdfs:.//localhost:9000/coprocessor/hbase-observer-test-1.0-SNAPSHOT.jar|com.imooc.bigdata.hbase.coprocessor.observer.RegionObserverTest|1001|'
如图:
enable 'CoprocessorTest' 启用这张表
如图:
执行 describe 'CoprocessorTest'
在CoprocessorTest表中添加一条数据
执行4次
put 'CoprocessorTest','rowkey1','cf:countCol','10'
我们在获取这张表中的字段的数据,如果我们的协处理器没有问题,那么结果的值就应该是40
get 'CoprocessorTest','rowkey1','cf:countCol'
如图:
9、HBase卸载/更新协处理器
重复加载的第二个Coprocessor实例不会发挥作用
要完成卸载/更新就需要重启JVM,也就是重启RegionServer