HBase(八) HBase JAVA API - 计数器和协处理器

举报
大数据小粉 发表于 2016/11/15 14:27:03 2016/11/15
【摘要】 是否会有这样的场景:有需要测试数据的时候,不知如何生成一些已包含测试数据的文件;或者你是临时需要一个小的程序,可以让你生成不同大小文件(比如大于1Mb少于100Mb),不需要从网络上去搜寻查找如何生成,这里有一些简单的方法帮你偷懒。

是否会有这样的场景:有需要测试数据的时候,不知如何生成一些已包含测试数据的文件;或者你是临时需要一个小的程序,可以让你生成不同大小文件(比如大于1Mb少于100Mb),不需要从网络上去搜寻查找如何生成,这里有一些简单的方法帮你偷懒。

计数器

在shell一节看到过incr命令,hbase会保证计数器读取和操作的原子性。 一次只能操作一行中的计数器,可以是一个也可以是多个,多行需要多个调用。 java客户端中,HTable有两种方式操作计数器
incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount)
incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
increment(Increment increment)
writeToWAL参数,默认是true,即WAL生效。前一种提供坐标可以操作单计数器,后面用Increment 实例可以操作单行的多计数器。 Increment 实例需要传入行健,Increment(byte[] row),也有行锁版本的构造函数。
增加一个计数器,实际上就是增加一列,使用addColumn(byte[] family, byte[] qualifier, long amount)方法,设置一个计数器操作。Increment 实例不同于Put,是不能设置版本的,但是可以使用setTimeRange(long minStamp, long maxStamp)方法来使老的计数器过期,即一次incr操作会认为这个计数器不存在,并设置为1。

代码示例:

Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "t1");
byte[] row = Bytes.toBytes("row0");
byte[] family = Bytes.toBytes("f1");
byte[] qualifier = Bytes.toBytes("counter");
byte[] qualifier2 = Bytes.toBytes("counter2");
// 会创建新列counter
long cnt1 = table.incrementColumnValue(row, family, qualifier, 1);
long cnt2 = table.incrementColumnValue(row, family, qualifier, 3);
long cnt3 = table.incrementColumnValue(row, family, qualifier, -2);
System.out.println("cnt1: " + cnt1 + " cnt2: " + cnt2 + " cnt3: " + cnt3);
// out: cnt1: 1 cnt2: 4 cnt3: 2
Increment increment = new Increment(row);
// [minStamp, maxStamp) 当前timestamp=1441112747878,所以改为setTimeRange(1,3)就会每次都置为1
increment.setTimeRange(3l, 1541112747878l);
increment.addColumn(family, qualifier, 1);
increment.addColumn(family, qualifier2, 1);
Result result = table.increment(increment);
for(KeyValue kv:result.raw()){
System.out.println(kv.toString()+"||"+Bytes.toLong(kv.getValue()));
}
table.close();

协处理器

协处理器的概念有点像Oracle的Trigger和PLSQL存储过程,用户可以在region server端进行region级的代码操作。用户不关心具体再哪里执行,hbase分布式框架会隐式处理。使用方式是监听一些隐式的事件,用钩子完成一些操作,或者自己扩展现有的RPC协议来引入自己的调用。使用场景包括维护辅助索引、维护数据间的引用完整性,权限控制等。
协处理器最底层的接口是Coprocessor,所有的Coprocessor都可以串起来使用,用法是类似servlet的过滤器的,实际上就是个责任链模式。 基于Coprocessor这个接口,hbase提供两类协处理器接口,就是对应上面说的Trigger和PLSQL存储过程概念:
observer:类似Trigger,提供一些hook,在服务端的特定事件发生时触发,这些事件包括用户调用产生的事件,也包含hbase服务端内部事件。 observer有几个基础接口:
ü RegionObserver:监听和处理表的region数据修改事件
ü MasterObserver:监听和处理集群级的事件,如管理或DDL类操作
ü WALObserver:控制WAL的钩子
endpoint:类似存储过程。动态扩展RPC协议,把用户自定义的操作添加到服务端。

Coprocessor接口

Coprocessor接口内部定义了一个枚举Coprocessor.State,定义了Coprocessor的生命周期的各个状态。

UNINSTALLED 最初状态,没有环境,也没有初始化
INSTALLED 实例装载了环境参数
STARTING 即将开始操作,start()方法调用前
ACTIVE start()方法调用后
STOPPING Stop()调用前
STOPPED Stop()方法将控制权交给框架

Coprocessor有优先级的概念,重要的级别是PRIORITY_SYSTEM和PRIORITY_USER,前者会在后者前执行。 Coprocessor只定义了两个方法start(CoprocessorEnvironment env) 和stop(CoprocessorEnvironment env) ,框架会控制Coprocessor的生命周期。
CoprocessorEnvironment 是Coprocessor生命周期中的环境参数,Coprocessor实例也保存在这个环境参数中,调用CoprocessorEnvironment. getInstance()方法得到这个实例。Coprocessor也只和他的环境参数做交互。协处理器不应该使用HTable实例,而应该使用CoprocessorEnvironment.getTable()方法访问表数据,这是因为Coprocessor不应该对行数据枷锁等原因,这个方法在HTable基础上做了限制。0.94版本还没有什么强制措施这样去限制,目前还是靠开发人员自己去约束。

下图是协处理器在每个region中的执行顺序描述

RegionObserver

pending open:

region打开前会调用preOpen(ObserverContext c) region打开并通知master后调用postOpen(ObserverContext c)
preOpen可以通知框架,控制本次打开操作是否放弃,postOpen可以执行预热或其他操作。
pending
open和open之间,region server会检查是否从WAL恢复数据到region中,这前后会触发preWALRestore()/ postWALRestore()两个方法。可以控制WAL重做时哪些修改要提交,也可以监督哪些记录被修改。
open:
region
打开后被加载到region
server并正常工作时,这个region就是open状态。这里面可以监控客户端API调用事件,如 preGet/postGet,preScannerOpen/postScannerOpen等等,具体可以查看RegionObserver的 API。
还有一些服务端事件,如preCompact/postCompact。
pending close:
preClose(ObserverContext c,
boolean abortRequested)和postClose是对应的hook,其中abortRequested表示了本次是否是正 常关闭,如果是true,表明是abort操作。
RegionCoprocessorEnvironment:
所有hook方法的第一个参数都是这个接口泛型的ObserverContext,它继承了CoprocessorEnvironment接口的。这个接 口新定义了三个方法,getRegion() 返回当前正在管理的HRegion
实 例,getRegionServerServices() 返回共享的RegionServerServices
实 例,getSharedData()返回当前所有coprocessor共享的数据。
RegionServerServices
实际上就是HRegionServer提供的一些方法,如getWAL() 方法返回HLog实例,可以访问 WAL;getFlushRequester() 方法返回FlushRequester实例,可以用来执行memstore的flush操 作。其他方法可以参考RegionServerServices接口的API。
ObserverContext,
所有Coprocessor执行时是,这个上下文实例都是一样的,只是包装了不同的 RegionCoprocessorEnvironment,Environment可以通过getEnvironment() 方法返回。
ObserverContext还有两个重要的上下文方法是bypass()和complete(),bypass()调用后,框架将使用用户提供的值, 而不使用框架通常使用的值;complete()表示执行链上后面的处理可以跳过,剩下没执行的协处理器也可以跳过,即当前协处理器的响应是最后一个协处理器。
bypass的一种用法是可以阻止当前事件的处理,如停止region的自动拆分:
public void preSplit(ObserverContext e)
{
e.bypass();
};
BaseRegionObserver是采用了swing的类似思路,实现了所有RegionObserver接口的空方法,默认情况下,这个类是没有任何功能的。可以继承后,选择感兴趣的方法进行重载。
public class RegionObserverDemo extends BaseRegionObserver {
public static final byte[] FIXED_ROW = Bytes.toBytes("@@@GETTIME@@@");
@Override
public void preGet(final ObserverContext e, final Get get,
final List results) throws IOException {
if (Bytes.equals(get.getRow(), FIXED_ROW)) {
KeyValue kv = new KeyValue(get.getRow(), FIXED_ROW, FIXED_ROW, Bytes.toBytes(System.currentTimeMillis()));
results.add(kv);
e.bypass();
}
}
}
MasterObserver

MasterObserver处理master服务器的所有hook,主要是DDL操作和整体的一些操作。
方法命名也是preXX和postXX的风格,但是有两个特殊的preShutdown(...)和preStopMaster(...),前一个是集群关闭前的hook,后一个是master进程关闭前的hook,这两个是没有对应的post函数的。

DDL类的hook有:

void preCreateTable(...) / void postCreateTable(...)
void preDeleteTable(...) / void postDeleteTable(...)
void preModifyTable(...) / void postModifyTable(...)
void preAddColumn(...) / void postAddColumn(...)
void preModifyColumn(...) / void postModifyColumn(...)
void preDeleteColumn(...) / void postDeleteColumn(...)

集群类操作的有:

void preEnableTable(...) / void postEnableTable(...)
void preDisableTable(...) / void postDisableTable(...)
void preMove(...) / void postMove(...)
void preAssign(...) / void postAssign(...)
void preUnassign(...) / void postUnassign(...)
void preBalance(...) / void postBalance(...)
boolean preBalanceSwitch(...) / void postBalanceSwitch(...)
void preShutdown(...)
void preStopMaster(...)

方法参数结构和RegionObserver是类似的,第一个参数也是ObserverContext,是如preModifyTable(ObserverContext ctx, byte[] tableName, HTableDescriptor htd),MasterCoprocessorEnvironment也是继承了CoprocessorEnvironment接口,特殊的私有方法是MasterServices getMasterServices()

MasterServices 实例提供了一些master端的服务实例,这些功能要慎用:
AssignmentManager getAssignmentManager():AssignmentManager实例负责所有的region分配操作,如分配、卸载、负载均衡
MasterFileSystem getMasterFileSystem():MasterFileSystem提供一个与master操作相关的文件系统抽象,如创建表或日志文件的目录
ServerManager getServerManager():ServerManager可以访问所有服务,包括激活和去激活状态。
ExecutorService getExecutorService():ExecutorService用于master系统调度
void checkTableModifiable(byte[] tableName)检查表是否已存在、已离线

类似的,也有个BaseMasterObserver辅助类

public class MasterObserverDemo extends BaseMasterObserver {

@Override
public void preEnableTable(ObserverContext ctx, byte[] tableName) throws IOException {
System.out.println("preEnableTable: "+Bytes.toString(tableName));
super.preEnableTable(ctx, tableName);
}

@Override
public void postEnableTable(ObserverContext ctx, byte[] tableName) throws IOException {
System.out.println("postEnableTable: "+Bytes.toString(tableName));
super.postEnableTable(ctx, tableName);
}

@Override
public void preDisableTable(ObserverContext ctx, byte[] tableName) throws IOException {
System.out.println("preDisableTable: "+Bytes.toString(tableName));
super.preDisableTable(ctx, tableName);
}
}
Endpoint

实现一个endpoint需要两步:
1、 扩展CoprocessorProtocol接口;定义与endpoint的通信协议,其实就是客户端和服务端间的RPC协议。
2、 扩展BaseEndpointCoprocessor类;实现上面自定义协议的接口方法和抽象类BaseEndpointCoprocessor的方法。

HTable有几个方法,可以得到CoprocessorProtocol实例,进而与定义的endpoint通信
Map coprocessorExec(Class protocol, byte[] startKey, byte[] endKey, Batch.Call callable)
void coprocessorExec(Class protocol, byte[] startKey, byte[] endKey, Batch.Call callable, Batch.Callback callback)
T coprocessorProxy(Class protocol, byte[] row)
这几个方法要么定义了起止键,要么直接给了行健,这是因为CoprocessorProtocol要和表中的region关联。org.apache.hadoop.hbase.client.coprocessor. Batch.Call是一个回调的接口,因为coprocessorExec方法可能涉及多个region,所以每个匹配的region都会调用一次这个接口的R call(T instance)方法,方法参数就是CoprocessorProtocol实例,可以调用定义好的协议方法。Batch.Callback callback的会执行update(byte[] region, byte[] row, R result) ,result是Batch.Call的call方法返回结果。代理方法实际就是拿到了一个CoprocessorProtocol实例,直接调用协议方法即可。这里用语言解释比较难懂,看一个代码的例子会比较好理解。
定义协议:
public interface RowCountProtocol extends CoprocessorProtocol {

long getRowCount() throws IOException;
long getRowCount(Filter filter) throws IOException;
long getKeyValueCount() throws IOException;
}
服务端的endpoint实现:
public class RowCountEndpoint extends BaseEndpointCoprocessor implements RowCountProtocol {
private long getCount(Filter filter, boolean countKeyValues) throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(1);
if (filter != null) {
scan.setFilter(filter);
}
RegionCoprocessorEnvironment environment = (RegionCoprocessorEnvironment) getEnvironment();
// use an internal scanner to perform scanning.
InternalScanner scanner = environment.getRegion().getScanner(scan);

int result = 0;

try {
List curVals = new ArrayList();
boolean done = false;
do {
curVals.clear();
done = scanner.next(curVals);
result += countKeyValues ? curVals.size() : 1;
} while (done);
} finally {
scanner.close();
}
return result;
}

@Override

public long getRowCount() throws IOException {
return getRowCount(new FirstKeyOnlyFilter());
}

@Override

public long getRowCount(Filter filter) throws IOException {
return getCount(filter, false);
}

@Override

public long getKeyValueCount() throws IOException {
return getCount(null, true);
}
}
客户端调用:

public static void main(String[] args) throws IOException {
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "t1");
try {
// 这里起止键为空,统计表的所有行
Map results = table.coprocessorExec(RowCountProtocol.class, null, null,
new Batch.Call() {
// 也可以用Batch.forMethod通过反射方法预先定义Batch.Call,但不够灵活,不展开

@Override
public Long call(RowCountProtocol counter) throws IOException {
// 这里实际执行endpoint的方法
return counter.getRowCount();
}
});
long total = 0;
for (Map.Entry entry : results.entrySet()) {
total += entry.getValue().longValue();
System.out.println("Region: " + Bytes.toString(entry.getKey()) + ", Count: " + entry.getValue());
}
System.out.println("Total Count: " + total);
} catch (Throwable throwable) {
throwable.printStackTrace();
}
table.close();
}
协处理器的加载
首先要把自定义的代码打成jar上传到各个服务器,并在hbase-env.sh配置classpath
export HBASE_CLASSPATH=/opt/netwatcher/pm4h2/app/opt/hbase_derek.jar
协处理器要在hbase-site.xml中分类配置,多个类用逗号分隔,类的顺序就是执行顺序:

hbase.coprocessor.region.classes
coprocessor.RegionObserverDemo,coprocessor.RowCountEndpoint


hbase.coprocessor.master.classes
coprocessor.MasterObserverDemo

重启hbase使配置生效。
查看之前的协处理器是否生效:
regionObserver:
hbase(main):001:0> get 't1','@@@GETTIME@@@'
COLUMN CELL
@@@GETTIME@@@:@@@GETTIME@@@ timestamp=9223372036854775807, value=\x00\x00\x01O\xB4\xFD\x03x
1 row(s) in 0.5840 seconds
masterObserver:
hbase(main):001:0> disable 't1'
0 row(s) in 2.6950 seconds
在hbase-acrosspm-master-pmapp1.log日志中可以看到插入的日志:
2015-09-10 10:11:46,968 WARN coprocessor.MasterObserverDemo: derek-test======preDisableTable: t1
2015-09-10 10:11:46,986 INFO org.apache.hadoop.hbase.master.handler.DisableTableHandler: Attemping to disable table t1
2015-09-10 10:11:47,000 INFO org.apache.hadoop.hbase.master.handler.DisableTableHandler: Offlining 1 regions.
2015-09-10 10:11:48,011 INFO org.apache.hadoop.hbase.master.handler.DisableTableHandler: Disabled table is done=true
endpoint客户端main函数输出:
Region: t1,,1439347624316.5f2df32e5af7eaedb49133aa6cf65b7e., Count: 5
Total Count: 5

作者 | 林钰鑫

转载请注明出处:华为云博客 https://portal.hwclouds.com/blogs

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

举报
请填写举报理由
0/200