孙宇的技术专栏 大数据/机器学习

HBase

2016-09-05

阅读:


HBase

HBase 基于 hadoop 和 zookeeper。先安装JAVA环境

HBase作为一个典型的NoSQL数据库,可以通过行键(Rowkey)检索数据,仅支持单行事务,主要用于存储非结构化和半结构化的松散数据。与Hadoop相同,HBase设计目标主要依靠横向扩展,通过不断增加廉价的商用服务器来增加计算和存储能力。

HBase有一些显著的特点:

  1. 容量大。HBase 单表可达百亿行、百万列。
  2. 面向列。传统数据库是按行存储,在没有索引的时候查询非常慢。对于列存储,每一列存储在一起,它就相当于是索引。当只需要访问某些列时,IO会非常少。
  3. 稀疏性。传统行存储中,通常会有大量的NULL列存在,但它们也占用空间。这会造成浪费。但在列存储中,这些列并不占用空间。
  4. 扩展性/可靠性。HBase 基于 Hadoop,扩展性强。
  5. 高性能。基于底层的LSM数据结构和Rowkey有序排列的特性。使得写入性能非常高。

使用场景/成功案例

搜索引擎

爬虫从互联网上抓取各网页的内容,然后放到 Hbase 中,再通过 MapReduce 对数据进行索引的构建。 用户在搜索引擎上搜索时,先将搜索词进行分词,然后获取各个分词索引的内容,再按各内容的匹配度得分进行排序,返回最符合的答案。

统计平台

诸如百度统计,CNZZ等或者公司内容用来监控服务器集群信息的平台。将各机器各指标增量的存入HBase。查询速度快,可扩展性高。

广告平台

诸如百度广告联盟,谷歌广告联盟。各广告的展现、点击数据统计。及用户行为的事件跟踪。

社交平台

诸如 Facebook、Twitter,用户量巨大的社交平台,要存储大量的用户及用户帖子及评论相关内容。传统的数据库已经无法满足扩展性和性能的要求。

短信系统

诸如平台的站内信系统,及聊天工具的信息发送系统。

安装

cd /data/soft/
wget http://archive.apache.org/dist/hbase/hbase-0.94.9/hbase-0.94.9.tar.gz
tar -zxf hbase-0.94.9-bin.tar.gz
mv hbase-0.94.9 /usr/local/hbase

注意:下载的时候一定要注意版本。后面通过JAVA进行客户端连接时,如果客户端 jar 和服务端的版本不一致,会连接失败。 当安装过一个版本,想安装另一个版本时,一定要把 /tmp/hbase* 目录及文件全部删掉

修改 /etc/profile 添加:

export HBASE_HOME=/usr/local/hbase

启动 hbase:

/usr/local/hbase/bin/start-hbase.sh
starting master, logging to /usr/local/hbase/bin/../logs/hbase-root-master-localhost.out

启动后,16010 端口会开始监听。 同时,会自动启动 zookeeper,它是监听 2181 端口

可以访问管理端:http://192.168.192.110:60010/master-status

hbase 的配置信息主要在两个文件里: hbase-env.sh, hbase-site.xml 里。这两个文件在安装文件的解压目录。即:/usr/local/conf/。

在单机模式的默认设置里,HBase把数据写到了 /tmp 下,但此处不宜久留。我们可以更改 hbase-site.xml 文件,添加下面的配置来修改数据目录:

<property>
	<name>hbase.rootdir</name>
	<value>/data/hbase/</value>
</property>

改完后,重新启动 hbase(执行 /usr/local/hbase/bin/stop-hbase.sh 和 /usr/local/hbase/bin/start-hbase.sh)。启动后,/data/hbase/目录下会有数据文件和目录。

hbase shell

可以通过命令行和hbase进行交互。

/usr/local/hbase/bin/hbase shell

HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.1.3, r72bc50f5fafeb105b2139e42bbee3d61ca724989, Sat Jan 16 18:29:00 PST 2016

hbase(main):001:0>list
TABLE
0 row(s) in 0.2290 seconds

=> []
hbase(main):001:0> quit

HBase 使用表作为存储数据的结构。和关系数据库(MySQL, Oracle 等)的行存储不同的是,HBase 使用的是列存储。

创建表

hbase(main):001:0> create 'mytable', 'cf'
0 row(s) in 1.5430 seconds

=> Hbase::Table - mytable

hbase(main):002:0> list
TABLE
mytable
1 row(s) in 0.0120 seconds

=> ["mytable"]

写入数据

现在往表中写入字符串: hello HBase。即:往表 mytable 的 first 行中的 cf:message 列对应的数据单元写入字符串: hello HBase。命令如下:

hbase(main):005:0> put 'mytable', 'first', 'cf:message', 'hello HBase'
0 row(s) in 0.0060 seconds

多插入几条:

hbase(main):005:0> put 'mytable', 'second', 'cf:foo', '0x0'
0 row(s) in 0.0080 seconds

hbase(main):005:0> put 'mytable', 'third', 'cf:bar', '3.14159'
0 row(s) in 0.0130 seconds

现在表里有三条数据了。要注意的是:插入的时候的列名是我们事先没有定义的。而且也没有定义列的字段类型。

读取数据

HBase有两种读取数据的方式: get 和 scan。scan 是取多条,类似遍历,会返回所有的数据。 和插入数据的 put 对应,取单条数据就用 get。

hbase(main):010:0> get 'mytable', 'first'
COLUMN                                    CELL
 cf:message                               timestamp=1465973297264, value=hello HBase
1 row(s) in 0.0350 seconds

HBase 出来的值还带有一个时间戳。因为它可以为每个数据单元存储多个时间版本。版本默认是 3 个,但是可以重新设置。读取时,除非特别指定,否则默认返回的是最新时间的版本。如果不希望存储多个时间版本,可以设置 HBase 只存储一个版本,但千万不要禁用该功能。

hbase(main):011:0> scan 'mytable'
ROW                                       COLUMN+CELL
 first                                    column=cf:message, timestamp=1465973297264, value=hello HBase
 second                                   column=cf:foo, timestamp=1465973405764, value=0x0
 third                                    column=cf:bar, timestamp=1465973412731, value=3.14159
3 row(s) in 0.0210 seconds

scan 返回了所有的数据行。而且行的顺序是按行的名字排的。

HBase 使用坐标来定位表中的数据。行键是第一个坐标,下一个是列族。列族用做坐标时,表示一组列。再下一个坐标是列限定符。

上面示例中,first, second, third 是行键;cf 是列族,它可以包括许多限定符(message, foo, bar)。 行键最好是用独一无二的值来充当。

实例

创建表

hbase(main):006:0> create 'users', 'info'
0 row(s) in 1.3100 seconds

=> Hbase::Table - users

users 是表的名称,info 是列族,它里面可以存很多列限定符。 创建表时至少要指定一个列族。 表创建后,列族是可以更改的,但比较麻烦。

查看表结构

hbase(main):008:0> list
TABLE
mytable
users
2 row(s) in 0.0110 seconds

=> ["mytable", "users"]

hbase(main):009:0> describe 'users'
Table users is ENABLED
users
COLUMN FAMILIES DESCRIPTION
{NAME => 'info', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL
 => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}
1 row(s) in 0.0140 seconds

插入数据

put 'users', 'finetkx', 'info:email', 'finetkx@alopecia.com'
put 'users', 'finetkx', 'info:name', 'Feny Ok'
put 'users', 'finetkx', 'info:password', 'abc123'
put 'users', 'finetkx', 'info:user', 'finetkx'


put 'users', 'testuser2', 'info:email', 'testuser2@alopecia.com'
put 'users', 'testuser2', 'info:name', 'Tony anthony'
put 'users', 'testuser2', 'info:password', 'abc123'
put 'users', 'testuser2', 'info:user', 'testuser2'


put 'users', 'areyousure', 'info:email', 'areyousure@alopecia.com'
put 'users', 'areyousure', 'info:name', 'Any Hais'
put 'users', 'areyousure', 'info:password', 'abc123'
put 'users', 'areyousure', 'info:user', 'areyousure'


put 'users', 'howareyou', 'info:email', 'howareyou@abc.com'
put 'users', 'howareyou', 'info:name', 'Sun Yu'
put 'users', 'howareyou', 'info:password', 'abc123'
put 'users', 'howareyou', 'info:user', 'howareyou'

JAVA-HBase案例

开发环境

开发IDE是 eclipse, 操作系统是MAC OS, JAVA 版本是 1.7 HBase 服务是安装在虚拟机上的 CentOS 6.4, JAVA 版本是 1.7, 软件环境是 HBase 0.94, IP 是 192.168.192.110

虚拟机设置 hostname

编辑文件: 
vi /etc/sysconfig/network

NETWORKING=yes
HOSTNAME=vitual_box

保存文件后,执行命令:
hostname vitual_box

虚拟机设置 hosts

192.168.192.110 localhost vitual_box

vitual_box 是虚拟机的 hostname

HBase服务配置

理更改文件 hbase-site.xml 如:
vi /usr/local/hbase-0.94.9/conf/hbase-site.xml

<configuration>
<property>
	<name>hbase.zookeeper.quorum</name>
	<value>192.168.192.110</value>
</property>
<property>
        <name>hbase.rootdir</name>
        <value>/data/hbase-0.94/</value>
</property>
</configuration>

这一步应该不设置也行。

本地机器更改 hosts:

192.168.192.110 vitual_box localhost

vitral_box 是虚拟机的 hostname 将本地 localhost 也配置成虚拟机的IP

测试代码

  1. 新建 JAVA 项目
  2. 下载相关依赖包 包有: apache-commons-lang.jar apache-logging-log4j.jar com.google.protobuf-2.4.0.jar commons-configuration-1.7.jar google-collect-1.0.jar hadoop-core-1.0.3.jar hbase-0.94.9.jar joda-time-2.0.jar org-apache-commons-logging.jar zookeeper.jar

  3. 测试代码 testHBase.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;

public class testHbase {
	public static void main(String[] args) throws Exception {
		Configuration conf = HBaseConfiguration.create();
		conf.set("hbase.zookeeper.quorum", "192.168.192.110");
		HBaseAdmin admin = new HBaseAdmin(conf);

		// 如果表 user 存在,遍历表,取出各行的值
		// 如果表 user 不存在,新建表
		if (admin.tableExists("users")) {
			System.out.println("User table already exists.lists table users");

			byte[] INFO_FAM = Bytes.toBytes("info");

			byte[] USER_COL = Bytes.toBytes("user");
			byte[] NAME_COL = Bytes.toBytes("name");
			byte[] EMAIL_COL = Bytes.toBytes("email");
			byte[] PASS_COL = Bytes.toBytes("password");

			// 连接池
			HTablePool pool = new HTablePool();

			HTableInterface users = pool.getTable("users");

			// 添加数据
			Put p = new Put(Bytes.toBytes("javauser"));
			p.add(INFO_FAM, USER_COL, Bytes.toBytes("javauser"));
			p.add(INFO_FAM, NAME_COL, Bytes.toBytes("javauser"));
			p.add(INFO_FAM, EMAIL_COL, Bytes.toBytes("javauser@abc.com"));
			p.add(INFO_FAM, PASS_COL, Bytes.toBytes("abc123"));

			users.put(p);

			Scan s = new Scan();
			s.addFamily(INFO_FAM);

			// 遍历表 users
			ResultScanner results = users.getScanner(s);
			for (Result r : results) {
				// 获取列族 info 中的各个限定符的值
				String name = Bytes.toString(r.getValue(INFO_FAM, NAME_COL));
				String email = Bytes.toString(r.getValue(INFO_FAM, EMAIL_COL));
				String user = Bytes.toString(r.getValue(INFO_FAM, USER_COL));
				String password = Bytes
						.toString(r.getValue(INFO_FAM, PASS_COL));

				System.out.println(name + "==" + email + "==" + user + "=="
						+ password + "\r\n");
			}

			// 删除数据
			Delete d = new Delete(Bytes.toBytes("javauser"));
			users.delete(d);

			users.close();
		} else {
			// 创建表 users
			System.out.println("Creating User table...");
			HTableDescriptor desc = new HTableDescriptor("users");
			// 创建列族 info
			HColumnDescriptor c = new HColumnDescriptor("info");
			desc.addFamily(c);
			admin.createTable(desc);
			System.out.println("User table created.");
		}
	}
}

运行后输出:

Any Hais==areyousure@alopecia.com==abc123

Feny Ok==finetkx@alopecia.com==abc123

Sun Yu==howareyou@abc.com==abc123

javauser==javauser@abc.com==javauser==abc123

Tony anthony==testuser2@alopecia.com==abc123

执行完后,再 scan 一次,发现 javauser 用户已经没有了,因为代码中已经做了删除处理。

或者通过 maven 创建项目。pom.xml 里的值如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd"
         xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <modelVersion>4.0.0</modelVersion>
  <groupId>sunyu.bigdata</groupId>
  <artifactId>hbase</artifactId>
  <version>1.0.0</version>
  <name>TwitBase</name>
  <url>http://www.manning.com/dimidukkhurana/</url>
  <description>TwitBase is a running example used throughout HBase In Action</description>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>2.4</version>
        <configuration>
          <source>1.3</source>
          <target>1.3</target>
        </configuration>
      </plugin>
      <plugin>  
        <artifactId>maven-resources-plugin</artifactId>  
        <version>2.4.3</version>  
        <configuration>  
            <encoding>utf-8</encoding>  
        </configuration>  
      </plugin> 
      <plugin>  
        <artifactId>maven-surefire-plugin</artifactId>  
        <version>2.12.4</version>  
        <configuration>  
            <encoding>utf-8</encoding>  
        </configuration>  
      </plugin> 
      <plugin>  
        <artifactId>maven-jar-plugin</artifactId>  
        <version>2.4</version>  
        <configuration>  
            <encoding>utf-8</encoding>  
        </configuration>  
      </plugin> 
    </plugins>
  </build>

  <repositories>
    <repository>
      <id>apache release</id>
      <url>https://repository.apache.org/content/repositories/releases/</url>
    </repository>
  </repositories>

  <dependencies>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-core</artifactId>
      <version>1.0.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase</artifactId>
      <version>0.94.9</version>
      <exclusions>
        <exclusion>
          <artifactId>maven-release-plugin</artifactId>
          <groupId>org.apache.maven.plugins</groupId>
        </exclusion>
      </exclusions>
    </dependency>
    <!-- Hadoop requires commons-io but doesn't list it as an explicit
         or transient dependency. include it manually. -->
    <dependency>
      <groupId>commons-io</groupId>
      <artifactId>commons-io</artifactId>
      <version>2.1</version>
    </dependency>
  </dependencies>
</project>

说明

HBase 是无模式的,我们在使用前不用把每个列限定符都定义出来。只用定义列族就行了。也不用指定数据类型。

修改数据和存储新数据一样,用 put 方式即可。

HBase写数据机制

HBase接到写数据命令(新增或编辑)时,将变化信息存储。写入失败时则抛出异常。

默认会写到两个地方:预写式日志(HLog)和 MemStore。以保证数据的持久化。只有当两个地方都写入了,写入动作才算完成。

MemStore 是内存里的写入缓冲区,Hbase中数据在永久写入磁盘之前是在这里的。当 MemStore 写满后,数据会写到磁盘,并形成一个新的 HFile。HFile 和列族有对应关系,一个列族可以有多个 HFile 文件,但一个 HFile 文件不能跨列族存储。

在集群的每个节点上,每个列族都有一个 MemStore。

在分布式结构中,如果还没写入 HFile服务器就崩溃了,那数据就丢失了。为了应对这种情况,HBase 会在写动作完成前先写入一个日志文件。集群中每台服务器都会有一个日志来记录所有的数据变化。如果写入日志都失败了,那么这次写操作会被认为失败。

服务器可以通过回放日志里的操作来进行数据恢复。

一个服务器只有一个日志,所有的列族共用该日志文件。

可以禁用写入日志,但这会面临数据丢失的风险。禁用方式是:

Put p = new Put(Bytes.toBytes("javauser"));
p.setWriteToWAL(false);

HBase读数据机制

从读取速度上来看,理想状态是所有数据都在内存中。

HBase 的读取必须连接HFile 和 MemStore 。它会通过 LRU(最近最少使用算法),把最常读取的数据保存到内存中(Block Cache)。每个列族都有自己的Block Cache。

BlockCache 中的 Block 是HBase从硬盘完成一次读取的数据单位。HFile 里存放的是 Block 的序列及索引。所以,从 HBase 里读取数据,首先要从 HFile 里读取索引并查到 Block ,再从硬盘中读取该 Block的内容。

Block 默认的大小是 64K,可以自己定义。Block 太小,会使索引增大,于是会增大内存消耗。如果Block太大,每次在Block里查找需要的数据时就会消耗更多的时间。

从HBase里读取一行,过程是:

  1. 检查 MemStore 的等待修改队列
  2. 检查 BlockCache 看包含该行的Block最近是否被缓存过
  3. 访问硬盘上的HFile

注意:HFile里存放着某个时刻MemStore的快照。一个完整的行可能包含大多个 HFile 里。

HBase删数据机制

delete 命令不是马上删除数据,它只是给数据打上一个标记。被标记的数据不会在 get 和 scan 命令中返回。

这是因为 HFile 文件是不能改变的,MemStore 每次会生成新的 HFile,但旧的是不会被改的。只有当执行文件合并的时候这些被标记的数据才会被处理掉。

文件合并分为两种:大合并小合并。两者都会重整存储在 HFile 里的数据。

小合并把多个小 HFile 合并生成一个大文件。

大合并会同时处理一个列族的专有 HFile 文件。

可以在 shell 中手动触发整个表或者特定范围的大合并。这个动作会非常耗时,不要经常使用。小合并是轻量级的,可以经常使用。

大合并是清理被删记录的唯一机会。因为一个列族可以被记在多个HFile里,但我们不能保证小合并处理的HFile 里就包括了一个信息的所有列。

HBase数据时间版本机制

HBase 会为每个值保存多个时间版本(默认是 3 个,可以设置),也就是说,我们在更新一个值时,旧的值还是会被保留。

设置方式是:

HColumnDescriptor c = new HColumnDescriptor("列族名字");
c.setMaxVersions(1);

我们在GET值时,可以提供时间版本来获得对应版本的值。如果不提供,则默认使用当时时间的值。

如果一个单元的版本超过最大数量,多出的记录会在下一次大合并的时候被丢掉。在实际应用中,可以将该值设置得足够大,以满足业务场景的需要。比如设置成几千万,几十亿。这样,就可以实现RDBS里的一对多的结构。比如用户的操作记录等。

我们也可以通过 deleteColumns() (带 s )删除小于指定时间版本的所有值。若不指定时间,则默认使用当前的时间。deleteColumn() 只删除指定的一个版本的值。

综上所述,HBase 里的数据相当于是一个四维的数组:表名[row_key][列族][列限定符][时间版本]

HBase表扫描

HBase 没有类似 query 的查询命令。要进行范围查找,只能通过 scan 命令进行扫描,然后再使用过滤器(filter)缩小结果集。

扫描缓存

在HBase的设置里,可以设置每次扫描返回多少条数据。设置方式是在 hbase-site.xml 里设置:

<property>
        <name>HBase.client.scanner.caching</name>
        <value>10</value>
</property>

也可以在JAVA代码中的扫描对象上进行 setCaching(int) 进行设置。

如果缓存设置为N,每次RPC调用扫描就会返回N条数据。默认该值是 1。可以调大一点。如果设置的值过大,会导致扫描超时。

扫描过滤器

扫描时可以使用一些常用的过滤器,也可以自己实现一个。

事务

HBase 不是一个ACID兼容的数据库。但它也有自己的一些特性:

  1. 操作是低级的,不可分的。比如给某行进行 put() ,要么整体成功,要么会回到操作前的状态,不会出现部分内容成功。
  2. 行间操作不是原子的。单行的操作是原子性的。
  3. 对于多个写操作,每个写操作是原子的。多个操作之间是独立的。
  4. 对于给定行的 get() 操作,返回系统当时所保存的完整行。
  5. 全表扫描不是对某个时间点表的快照的扫描。如果扫描已经开始,还没读到第N行,但这此第N行被改变了。扫描器读出的是被更改后的值。

HBase基本命令

Get

Put

Delete

Scan

Increment

HBase表设计

建模策略

建表之前我们通常要考虑如下内容:

  1. 这个表应该有多少个列族
  2. 列族使用什么数据
  3. 每个列族应该有多少列
  4. 列名应该是什么?虽然写数据时不用考虑列名,但读取时要知道列名
  5. 单元存放什么数据
  6. 每个单元存放多少个时间版本
  7. 行键结构是什么?

案例分析

前面建立了用户表 user 及基本信息的列族 info, 并添加了几个列限定符:user, email, name, password。用来存放用户的基本信息。 现在要添加新的功能:存放用户之间的关联信息。即类似微博里的关注功能。

常见的需求有:

  1. 用户A关注了谁?
  2. 用户A关注了B吗?
  3. 谁关注了用户A?
  4. 用户B关注了用户A吗?

分析: 一个列族,在物理上会放在一起存储,它可能是多个HFile文件。理想状态下是可以合并成一个大文件的。在这种性质下,我们应该将不同访问模式的数据放在不同的列族中。

方案 v1.0

新建一个表 follow,每个用户存为一行,行键是用户的ID。每个他关注的人存为一列,所有的列在一个列族里:

行键 userid 列族:follows
9527 1: 张三 2: 李四 3: 王五 4:赵六 5:孙七
  1. 问题一:用户A关注了谁? 只要知道A的ID,以它作为行键,去获得该行的数据,然后遍历所有的列就得到了他关注的所有人。

  2. 问题二:用户A关注了B吗? 将上一步得到的用户列表遍历一次,就可以知道是否关注了B。

  3. 问题三:谁关注了用户A? 该功能需要遍历所有的行,然后从各行的列族中去查找是否有A。显然不可行。效率极低。

  4. 问题四:用户B关注了用户A吗? 该功能和问题二一样,只用遍历B的各列即可。

所以,当前这种方案虽然能解决读取数据的基本需求,但性能上有严重缺陷。

方案 v2.0

下面再看当数据发生变更时的情况,就是写数据的需求:

  1. 用户A关注某人
  2. 用户取消关注某人

当新关注一个人时,需要往列族 follows 里添加一列。目前,列限定符是用的自增数,但如果不遍历该用户所有的列,不知道当前ID自增到多少了。所以为了方便,我们需要在行上额外添加一个用来保存当前自增数的列限定符。如:

行键 userid 列族:follows
9527 1: 张三 2: 李四 3: 王五 4:赵六 5:孙七count:5
1024 1: 张三 2: 李四 3: 王五 count:3 -- --

当用户新关注一个人时,处理流程是:先从表中读取计数,添加用户,更新计数器的值。看上去象关系数据库中的事务。但由于 HBase 是不支持事务的,所以这个过程就会有问题。

如果用户打开多个窗口,同时关注不同的人。很有可能两次操作读取的计数器是同一个值,导致一个列限定符的数据会覆盖另一个的。

为了应对这种情况,显然计数器是一定不能存在的。而且,列限定符也不能用自增的值来充当。我们直接用关注用户的ID来作列限定符,这时的数据如下:

行键 userid 列族:follows
9527 张三:1 李四:1 王五:1 赵六:1 孙七:1
1024 张三:1 李四:1 王五:1 -- --

可以看到,直接用关注的用户作为列限定符,而列的值则可以存其它东西,如果没有什么要存的,就直接存为 0 或 1。因为数据不能为空。而用用户名作为列限定符的另一个好处是,用户名是唯一的,不会担心数据被覆盖。

我们可以这么干,就是因为HBase 的列族是可以无限扩展,且不用事先定义

该方案有个问题:

用户关注的列表是不平均的,有的人关注很多人,有的关注的人少。所以,各行的列长度可能会有非常大差距。

在程序上,查询A是否关注了B,只需要遍历A的列,看是否有B即可。但在性能上,查一行数据的开销如下:

  1. 查找 region
  2. 在 MemStore 里定位 KeyValue,如果它存在,则查找在哪个 region
  3. 在HFile 里查找在哪个文件块中。
  4. 如果文件已经被刷到硬盘上了,则扫描具体的位置。

在访问HBase的数据时,决定性因素是扫描HFile数据块时找到相关KeyValue对象所花费的时间。如果使用上面那种宽行,扫描过程会增加处理整行的开销。

结论就是:访问宽行比窄行开销大。同样的信息,我们可以使用高表形式。

方案 v3.0

利用高表的形式,数据示例如下:

行键 列族:f
张三+李四 昵称一:1
王五+赵六 昵称二:1

把用户名昵称放在列限定符中的好处是可以节省在功能上获取用户昵称的时间。但如果用户修改了昵称,就需要改动所有关注了该用户的数据。如果用户修改信息的频率不高,是可以这么做的。

查找用户关注的列表时,需要找到以他自己用户名为前缀的数据块,然后再进行扫描。 当用户新关注一个人时,只用新加一行。取消时则删除该行。

注意: 高表带来性能的代价是放弃了原子性。在宽表中,更新用户关注列表用 put 就可以,是原子性的。但在 v3.0 里,需要添加,删除行。这些操作不是原子性的。

方案 v4.0

上面的方案里,行键是由用户的用户名组合而成。而用户名的长度是不固定的,我们可以把各个用户名MD5,这样就可以实现定长。方便计算扫描时的起始键和停止键。

而且加密后,也有助于数据更均匀的分布在 region 上。但是象关注这种数据,天生就会出现一些用户关注的多,一些少。所以就会造成分布不均匀,数据会集中到某些 region 上。而这些 region 将成为整个系统的瓶颈。数据示例如下:

行键 列族:f
md5(张三)md5(李四) 李四:昵称一
md5(张三)md5(王五) 王五:昵称二
md5(张三)md5(赵六) 赵六:昵称三

索引–目标数据访问

HBase里只有键可以建立索引(行键,列限定符,时间戳)。访问一个特定行的唯一办法就是通过行键。在列限定符和时间戳上建立索引可以在一行上扫描时直接跳到对应的列。

行键策略

有序特性

region 基于行键为一个区间的行提供服务,并且负责区间的每一行。 HFile 在硬盘上存储有序的行。当内存中的数据往硬盘上写的时候,它会被排序。

IO性能

HBase表的有序特性是一个重要的性质。比如要查找最新的一些信息,如果我们插入的时候就是拿时间作为键,查询时会非常快。因为数据在硬盘上保存就是排好序的。 负面影响就是这就会造成这些数据的访问频率会远高于一些老数据。因为大家会更关心最新数据,而不是旧数据。而最新的数据都集中在某些 region 中。这些 region 的性能就是系统的瓶颈。

写性能优化

当在 HBase 表里写入大量数据时,我们希望数据分布更平均一些。但这样就会影响读的效率。

列族配置

数据块大小

数据块大小可以在列族上进行设置。

每个HFile 默认是 64K(65536 字节)。数据块索引文件里记录了每个HFile数据块的起始键。数据块越小,索引文件越大,因为它要记录更多的数据。但是,随机查找的性能会越高,比如查单条数据,它能很快定义到某个更小的块中,然后再定位到需要的记录上。数据块越大,顺序查找的性能会越高,所以要根据对应的场景进行相应的配置。

可以在初始化表的时候设置数据块的大小:

hbase(main):001:0> create 'mytable', 
{NAME => 'colframe1', BLOCKSIZE => '65536'}

有时候需要关闭缓存。如:

表里的数据只被顺序扫描访问或很少访问,而且业务上不介意 get 或 scan 的时间是否有点长,这时候可以考虑关闭列族的缓存,可以在新建或修改表时设置:

hbase(main):001:0> create 'mytable', 
{NAME => 'colframe1', BLOCKCACHE => 'false'}

激进缓存

如果一些列族在业务上预计会比其它列访问会更多,可以将该列族设置更高的优先级:

hbase(main):001:0> create 'mytable', 
{NAME => 'colframe1', IN_MEMORY => 'true'}

IN_MEMORY 参数默认是 false。

bloom 过滤器

在访问一个特定行时,数据块索引提供有效的帮助。索引文件里存放了各数据块的起始键。

如果要查找一个短行,情况可能如下:

假如一行占用 100字节,一个 64K的数据块可以存 64 * 1024 / 100 ~= 700 行。但索引中只包括了起始行的位置。所以,要找到的行可能在一个特定的数据块区间内,但不确定在哪一个,需要依次查找。

bloom 过滤器提供类似: is_exsist 的方法,可以对每个块上的数据做是否存在的测试。当某行被请求时,先请求过滤器,检查是否存在。该过滤器可以以行为单位进行比较,也可以以列限定符作为比较的根据。

bloom 过滤器的功能需要提供额外的内存空间来存储数据。而且,行级过滤器要比列限定符级的占用空间要少。如果内存足够,可以用该方法来提升性能。

如:

hbase(main):001:0> create 'mytable', 
{NAME => 'colframe1', BLOOMFILTE => 'ROWCOL'}

BLOOMFILTE 参数默认值是 NONE。启动行级过滤器用 ROW。列限定符级用 ROWCOL。

行级过滤器检测特定行键是否存在。列限定符级过滤器检测行和列限定符的组合是否存在。

生存时间

应用中可能会有一些老数据是不会被访问的,比如新闻网站两年前的文章等。但可能后面一些特殊功能会需要查询,如:专题数据,专栏。所以,可以将某一时间点之前的数据归档存在硬盘的文件系统中。

HBase 可以在列族上设置一个TTL。早于指定TTL的数据会在下一次大合并的时候删除。如果同一单元上有多个时间版本,早于这个TTL的版本会被删除。

可以在建表的时候指定TTL,如果不设置,默认值是 2147483647(永不过期)。如:

hbase(main):001:0> create 'mytable', 
{NAME => 'colframe1', TTL => '18000'}

压缩

HFile 可以被压缩并存放在HDFS上。这有助于节省硬盘IO,但读取数据时的压缩和解压会提升CPU利用率。通常建议打开表的压缩设置。压缩设置有多种:LZO, Snappy, GZIP。

LZO和Snappy是最流行的两种。两者性能上差不多。但 Snappy 拥有BSD许可。所以建议使用:

hbase(main):001:0> create 'mytable', 
{NAME => 'colframe1', COMPRESSION => 'SNAPPY'}

注意: 数据只是在硬盘上压缩了,在 MemStore 里或网络传输时是没有的。

snappy安装:

下载 snappy:

http://google.github.io/snappy/

编译安装:

tar -zxf google-snappy-1.1.3-14-g32d6d7d.tar.gz
cd google-snappy-32d6d7d/
./autogen.sh
./configure
make
make install

默认的安装地址是在:/usr/local/lib

这时候查看该目录,会发现有一些库:

ll /usr/local/lib | grep snappy
-rw-r--r-- 1 root root  472574 7月  28 15:52 libsnappy.a
-rwxr-xr-x 1 root root     934 7月  28 15:52 libsnappy.la
lrwxrwxrwx 1 root root      18 7月  28 15:52 libsnappy.so -> libsnappy.so.1.3.0
lrwxrwxrwx 1 root root      18 7月  28 15:52 libsnappy.so.1 -> libsnappy.so.1.3.0
-rwxr-xr-x 1 root root  227696 7月  28 15:52 libsnappy.so.1.3.0

安装 hadoop-snappy:

https://code.google.com/archive/p/hadoop-snappy/

目前官网没有软件包提供,只能借助 svn 下载源码:

svn checkout http://hadoop-snappy.googlecode.com/svn/trunk/ hadoop-snappy

编译:

mvn package [-Dsnappy.prefix=SNAPPY_INSTALLATION_DIR]

单元时间版本

默认情况下 HBase 会为每个数据单元维护 3 个时间版本。也可以自己设定成 1。如:

hbase(main):001:0> create 'mytable', 
{NAME => 'colframe1', VERSIONS => 1}

也可以在创建时指定多个属性:

hbase(main):001:0> create 'mytable', 
{NAME => 'colframe1', VERSIONS => 1, TTL => '18000'}

也可以指定最少时间版本数,如:

hbase(main):001:0> create 'mytable', 
{NAME => 'colframe1', VERSIONS => 5, MIN_VERSION => '1'}

前面提到了,当时间版本早于TTL,这些版本会丢弃掉。但是如果指定了 MIN_VERSION,在合并时,最少会留下该参数值数的版本。这样,就算时间版本早于TTL,数据还是会有保留。

过滤数据

我们通过设计合适的行键,规划列族,让访问的数据在硬盘上也在一起,这样来减少读写操作时硬盘的寻道时间。但查询的时候通常会有一些条件,不会是单纯的顺序访问,所以我们可以进一步优化数据访问,过滤器就可以用来达到这一目的。

过滤器可以把过滤规则推送到服务器。在读取数据时,就对数据进行过滤,这样就可以减少服务器和客户端之间无用数据传输造成的IO浪费。当然,完整的数据还是需要从硬盘读进RegionServer,毕竟硬盘不可能配合过滤器去做过滤。

HBase 提供一个API,可以实现自己的过滤器,也可以使用一个或多个预置的过滤器。过滤器类必须实现 Filter 接口。或继承一个实现了该接口的抽象类。目前有一个 FilterBase 抽象类,已经实现了 Filter 接口。我们可以继承它,这样就不用去写接口的各个方法了。

当读取一行数据时,过滤器内有一些方法会被执行,但执行的顺序是固定的:

  1. 基于行键过滤
     boolean filterRowKey(byte[] buffer, int offset, int length)
    

    方法返回一个布尔值,意思就是,如果该行要被过滤掉,返回 true;否则返回 false。由于每读取一行都会执行该方法,所以,可以通过行键对每一行先进行过滤。

  2. 如果读取进来的行在上一步没有被过滤掉,它会进入这一步,并调用如下方法
     ReturnCode filterKeyValue(KeyValue v)
    

    方法返回一个ReturnCode,是一个在 Filter 接口中定义的枚举类型。返回值用来判断该 KeyValue 对象将要发生什么。具体要发生的事情在枚举定义中可以看到。

  3. 对KeyValue对象进行了过滤后,下面是这个方法:
     void filterRow(List<KeyValue> kvs)
    

    该方法传入成功通过过滤的 KeyValue 列表。在该方法里可以对列表中的元素进行任何操作,包括修改它的值。

  4. 经过上面操作后,这里又提供一次过滤行的机会,行会进入下面的方法:
     boolean filterRow()
    

    同第一步一样,返回 true 表示要过滤掉该行。

  5. 这一步的方法可以告诉过滤器提前结束这次扫描:
     boolean filterAllRemaining()
    

    当扫描很多行,在行键、列限定符或单元值里查找东西时,一旦找到目标,我们就不用关心剩下的行(比如按行键查找某个用户的信息,我们只需要一行)。这时候,这一步的方法就很方便。

另外还有一个 reset() 方法。它会重置过滤器,是由服务器调用的。

自定义过滤器

假设有如下需求:系统中一些老用户的密码强度不够,现在要求所有的密码都至少为 8 位。但有很多是不符合的。现在要找出不符合的用户。

定义过滤器:

我们可以自定义一个过滤器,从前面的五个过滤方法来看。判断密码长度显然不能通过行键来增行,只能通过 KeyValue 值来。所以我们要实现的自定义逻辑就是第 2 步的。代码如下:

package hbase;

import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

// 继承 FilterBase
public class PasswordStrengthFilter extends FilterBase {
	private int len;
	private boolean filterRow = false;

	public PasswordStrengthFilter() {
		super();
	}

	// 构造函数采用要判断的密码长度作为输入参数
	public PasswordStrengthFilter(int len) {
		this.len = len;
	}

	// 检查密码长度.如果长度大于要求的值,该行会被过滤掉不返回
	public ReturnCode filterKeyValue(KeyValue v) {
		if (Bytes.toString(v.getQualifier()).equals("password")) {
			// 长度大于值,设置过滤状态
			if (v.getValueLength() >= len) {
				this.filterRow = true;
			}
			// 在返回的结果中,不返回密码,将密码排除
			return ReturnCode.SKIP;
		}
		return ReturnCode.INCLUDE;
	}

	// 告知该行是否被过滤
	public boolean filterRow() {
		return this.filterRow;
	}

	// 过滤器应用到给定行后,重置过滤器状态
	public void reset() {
		this.filterRow = false;
	}

	public void write(DataOutput out) throws IOException {
		out.writeInt(len);
	}

	public void readFields(DataInput in) throws IOException {
		this.len = in.readInt();
	}
}

由于过滤器是在服务端执行的。所以,我们要把自定义的过滤器打包成 jar,并放到 HBase 的加载目录,供 HBase 启动时加载。要不然在客户端使用该过滤器时会报错。

打包 jar :

进入项目目录并执行:

cd /Users/sunyu/Documents/workspace_java/hbase
mvn install

执行完上面命令后,会发现在项目目录下的 target 文件夹下会有个文件:hbase-1.0.0.jar 将该文件放到HBase服务器上。并修改服务端配置文件: $HBASE_HOME/conf/hbase-env.sh,将上一步得到的 jar 文件配置进去:export HBASE_CLASSPATH=/vagrant/hbase-1.0.0.jar 然后重启HBase

编辑测试脚本,测试代码如下:

package hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.util.Bytes;

public class PasswordStrengthFilterExample {
	public static final byte[] TABLE_NAME = Bytes.toBytes("users");
	public static final byte[] INFO_FAM = Bytes.toBytes("info");

	public static final byte[] USER_COL = Bytes.toBytes("user");
	public static final byte[] NAME_COL = Bytes.toBytes("name");
	public static final byte[] EMAIL_COL = Bytes.toBytes("email");
	public static final byte[] PASS_COL = Bytes.toBytes("password");

	public static void main(String[] args) {
		try {
			Configuration conf = HBaseConfiguration.create();

			conf.set("hbase.zookeeper.quorum", "192.168.192.110");
			HBaseAdmin admin = new HBaseAdmin(conf);

			if (admin.tableExists("users")) {
				System.out
						.println("User table already exists.lists table users");

				// 连接池
				HTablePool pool = new HTablePool();

				HTableInterface users = pool.getTable("users");

				Scan scan = new Scan();
				scan.addColumn(PasswordStrengthFilterExample.INFO_FAM,
						PasswordStrengthFilterExample.PASS_COL);
				scan.addColumn(PasswordStrengthFilterExample.INFO_FAM,
						PasswordStrengthFilterExample.NAME_COL);
				scan.addColumn(PasswordStrengthFilterExample.INFO_FAM,
						PasswordStrengthFilterExample.EMAIL_COL);
				Filter f = new PasswordStrengthFilter(8);
				scan.setFilter(f);
				ResultScanner results = users.getScanner(scan);
				for (Result r : results) {
					// 获取列族 info 中的各个限定符的值
					String name = Bytes.toString(r.getValue(
							PasswordStrengthFilterExample.INFO_FAM,
							PasswordStrengthFilterExample.NAME_COL));
					String email = Bytes.toString(r.getValue(
							PasswordStrengthFilterExample.INFO_FAM,
							PasswordStrengthFilterExample.EMAIL_COL));
					String password = Bytes.toString(r.getValue(
							PasswordStrengthFilterExample.INFO_FAM,
							PasswordStrengthFilterExample.PASS_COL));

					System.out.println(name + "==" + email + "==" + password);
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		}

	}
}

执行结果是:

Naoto Srinivas== Srinivas32@alopecia.com==null
Any Hais==areyousure@alopecia.com==null
Feny Ok==finetkx@alopecia.com==null
Sun Yu==howareyou@abc.com==null
Rob Ole==le23@tarboard.com==null
Ben Norbert==orbert 5@conflagrative.com==null
Tony anthony==testuser2@alopecia.com==null
Srinivas Micheal==vas Micheal38@wolfsbergite.com==null
Rodney Tollefsen==y Tollefsen97@areality.com==null

因为在过滤器中将密码这一列排队了,所以上面的结果中,密码都是 null

预装过滤器

HBase 自带了许多过滤器,可以直接使用。常见的过滤器有:

行过滤器 RowFilter

基于行键对数据进行过滤,可以进行精确匹配,字符串匹配、正则匹配等。 为了实例化 RowFilter,需要提供比较操作符和要比较的值。它的构造函数是:

public RowFilter(CompareOp rowCompareOp, WritableByteArrayComparable rowComparator)

比较操作符是在CompareOp里指定的。它是一个枚举类型。值有:

  1. LESS – 检查是否小于比较器里的值
  2. LESS_OR_EQUAL – 检查是否小于或等于比较器里的值
  3. EQUAL – 检查是否等于比较器里的值
  4. NOT_EQUAL – 检查是否不等于比较器里的值
  5. GREATER_OR_EQUAL – 检查是否大于或等于比较器里的值
  6. GREATER – 检查是否大于比较器里的值
  7. NO_OP – 默认返回 false, 因此过滤掉所有东西

比较器要继承 WritableByteArrayComparable 抽象类。可以直接使用的有:

  1. BinaryComparator – 使用 Bytes.compareTo() 方法比较
  2. BinaryPrefixComparator – 使用 Bytes.compareTo() 方法,从左开始执行基于前缀的字级节比较
  3. NullComparator – 检查给定值是否为空
  4. BitComparator – 执行按位比较
  5. RegexStringComparator – 通过正则比较
  6. SubstringComparator – 看是否包含某字符串

示例:

package hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class testFilter {
	public static void main(String[] args) throws Exception {
		Configuration conf = HBaseConfiguration.create();
		conf.set("hbase.zookeeper.quorum", "192.168.192.110");
		HBaseAdmin admin = new HBaseAdmin(conf);

		byte[] INFO_FAM = Bytes.toBytes("info");

		byte[] USER_COL = Bytes.toBytes("user");
		byte[] NAME_COL = Bytes.toBytes("name");
		byte[] EMAIL_COL = Bytes.toBytes("email");
		byte[] PASS_COL = Bytes.toBytes("password");

		// 连接池
		HTablePool pool = new HTablePool();

		HTableInterface users = pool.getTable("users");

		Scan s = new Scan();
		s.addFamily(INFO_FAM);
		Filter rowFilter = new RowFilter(CompareOp.EQUAL,
				new RegexStringComparator(".*you"));
		s.setFilter(rowFilter);
		// 遍历表 users
		ResultScanner results = users.getScanner(s);
		for (Result r : results) {
			// 获取列族 info 中的各个限定符的值
			String name = Bytes.toString(r.getValue(INFO_FAM, NAME_COL));
			String email = Bytes.toString(r.getValue(INFO_FAM, EMAIL_COL));
			String user = Bytes.toString(r.getValue(INFO_FAM, USER_COL));
			String password = Bytes.toString(r.getValue(INFO_FAM, PASS_COL));

			System.out.println(name + "==" + email + "==" + user + "=="
					+ password);
		}

		users.close();
	}
}

执行结果:

Any Hais==areyousure@alopecia.com==areyousure==abc123
Sun Yu==howareyou@abc.com==howareyou==abc123

前缀过滤器

这是 RowFilter 的一种特例。它基于行键的前缀值进行过滤。它相当于给 scan 提供了一个结束区间。使用如下:

String prefix = "a";
Scan scan_prefix = new Scan(prefix.getBytes());
scan_prefix.setFilter(new PrefixFilter(prefix.getBytes()));

限定符过滤器

它是用来匹配列限定符而不是行键。但用法和前面的行键过滤差不多:

Filter col_filter = new QualifierFilter(CompareOp.LESS_OR_EQUAL,
				new BinaryComparator(Bytes.toBytes("name")));

值过滤器

// 过滤掉单元值不是以 areyou 开始的列
Filter value_filter = new ValueFilter(CompareOp.EQUAL,
				new BinaryPrefixComparator(Bytes.toBytes("areyou")));

运行结果:

null==areyousure@alopecia.com==areyousure==null

时间戳过滤器

该过滤器允许对各时间版本进行精细的控制,可以提供一个应用返回的时间戳列表,只有与列表匹配的单元才会返回。

List<Long> time_lists = new ArrayList<Long>();
time_lists.add(1466669143737L);

Filter time_filter = new TimestampsFilter(time_lists);

过滤器列表

如果需要进行多种过滤,可以将多个过滤器组合起来。多个过滤器可以以两种模式进行:MUST_PASS_ALL 或 MUST_PASS_ONE。前者表示必须满足所有的过滤器,后者表示只用满足一个即可。用法:

List<Filter> filter_list = new ArrayList<Filter>();
filter_list.add(time_filter);
filter_list.add(value_filter);
		
FilterList list_filter = new FilterList(FilterList.Operator.MUST_PASS_ALL, filter_list);
		
list_filter.addFilter(col_filter);

过滤器列表中的过滤器会按照添加入列表的顺序执行。

协处理器

触发器 observer

一个HBase 请求的生命周期大致如下:

  1. 客户端发出 put 请求
  2. 请求被分派到合适 RegionServer 的 region 上
  3. region 收到 put 指令,进行处理,并返回响应
  4. 返回结果给客户端

observer 位于客户端和HBase之间。一次操作可以配置多个触发器,它们会按优先级次序执行。触发器的触发过程大致如下:

  1. 客户端发出 put 请求
  2. 请求被分派到合适 RegionServer 的 region 上
  3. 请求被拦截,然后在该表上登录的 RegionObserver 上调用 prePut()
  4. 如果没有 prePut()拦截,请求继续送到 region,后续处理
  5. region 的结果返回,并再次被拦截,RegionObserver 上的 postPut() 被调用
  6. 若没有被 postPut() 拦截,结果直接被返回

可以看出,在 prePut() 时就有可能直接返回一个结果或者中断请求。在 postPut() 也可以对处理的结果进行自定义的处理。

协处理器和 RegionServer 在相同的进程空间里。所以,协处理器的代码拥有服务器上 HBase用户进程的所有权限,意味着如果协处理器出错可能使整个 HBase 崩溃。

Observer 目前有三种:

  1. RegionObserver,在数据访问和操作阶段监听。
  2. WALObserver,预写日志。
  3. MasterObserver,对DDL事件监听,如表的创建,修改等。

自定义触发器

package hbase;

import java.io.IOException;

import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;

public class UserObserver extends BaseRegionObserver {

	private HTablePool pool = null;

	@Override
	public void start(CoprocessorEnvironment env) throws IOException {
		pool = new HTablePool(env.getConfiguration(), Integer.MAX_VALUE);
	}

	@Override
	public void stop(CoprocessorEnvironment env) throws IOException {
		pool.close();
	}

	@Override
	public void postPut(final ObserverContext<RegionCoprocessorEnvironment> e,
			final Put put, final WALEdit edit, final boolean writeToWAL)
			throws IOException {

		byte[] table = e.getEnvironment().getRegion().getRegionInfo()
				.getTableName();
		if (!Bytes.equals(table, Bytes.toBytes("users")))
			return;

		KeyValue kv = put.get(Bytes.toBytes("info"), Bytes.toBytes("password"))
				.get(0);
		String pwd = Bytes.toString(kv.getValue());

		String nowRowKey = Bytes.toString(kv.getKey());

		HTableInterface t = pool.getTable(Bytes.toBytes("users"));

		Put p = new Put(Bytes.toBytes(nowRowKey));
		p.add(Bytes.toBytes("info"), Bytes.toBytes("password"),
				Bytes.toBytes("obverserpwd" + pwd));
		t.put(p);

		t.close();

	}
}

安装触发器

进入项目目录,并执行打包命令:

cd ~/Documents/workspace_java/hbase/
mvn package

运行完后,在项目目录下的 target 目录下会有个 jar 文件:hbase-1.0.0.jar 将文件文件传到 HBase 服务器上, 路径为:/vagrant/hbase-1.0.0.jar

登录 hbase shell,并将 users 表下线,然后再安装触发器:

[root@vitual_box vagrant]# /usr/local/hbase-0.94.9/bin/hbase shell
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 0.94.9, r1496217, Mon Jun 24 20:57:30 UTC 2013

hbase(main):001:0> disable 'users'
0 row(s) in 2.9400 seconds

hbase(main):002:0> alter 'users', METHOD => 'table_att', 'coprocessor' => 'file:///vagrant/hbase-1.0.0.jar|hbase.UserObserver|1001|'
Updating all regions with the new schema...
1/1 regions updated.
Done.
0 row(s) in 1.1090 seconds

hbase(main):003:0> enable 'users'
0 row(s) in 1.1050 seconds

hbase(main):004:0> describe 'users'
DESCRIPTION                                                                                             ENABLED
 'users', {METHOD => 'table_att', coprocessor$1 => 'file:///vagrant/hbase-1.0.0.jar|hbase.UserObserver| true
 1001|'}, {NAME => 'info', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '
 0', VERSIONS => '3', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '2147483647', KEEP_DELETED_CEL
 LS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', ENCODE_ON_DISK => 'true', BLOCKCACHE => 'tr
 ue'}
1 row(s) in 0.0370 seconds

上面操作中,关闭该表会让所有 region 下线。alter 命令更新表模式,让它知道新的协处理器。这种在线安装的办法只适用于 observer 协处理器。处理器属性参数用 | 字符分隔。 第一个参数是 jar 的路径,第二个参数是协处理器的类,第三个参数是处理器的优先级。当有多个协处理器时,会按该优先级顺序执行。

最后查看表的描述,可以看到协处理器已经应用上去了。

当往 users 表里插入新数据时,UserObserver 会起作用,会把用户的密码进行重新设置。测试:

put 'users', 'testobserver', 'info:password', '123456'

存储过程 endpoint

// TODO

分布式 HBase, HDFS, MapReduce

HBase 是构建在 hadoop 上的。目的是两个方面:

1.hadoop 的 map reduce 提供分布式计算框架,支持高吞吐量的数据访问 2.HDFS 提供可用性,可靠性

一个简单的示例可以参照: http://blog.sina.com.cn/s/blog_5f54f0be0101f6sj.html

HadoopMapReduce机制

为了提供一个普遍适用、可靠、容错的分布式计算框架,MapReduce对于如何实现应用程序有一定限制:

  1. 所有计算都被分解为 map 或者 reduce 任务来实现
  2. 每个任务处理全量数据中的一小部分
  3. 任务只关注自己的输入数据,不和其它任务通信

示例

时间计算案例

假设有一个服务器日志,用来记录用户在各个应用中花的时间,内容如下:

Data Time UserId Activity TimeSpent
2016-07-21 17:00 user1 load_page1 3s
2016-07-21 17:01 user1 load_page2 5s
2016-07-21 17:01 user2 load_page1 1s
2016-07-21 17:01 user3 load_page1 2s
2016-07-21 17:04 user4 load_page3 10s
2016-07-21 17:05 user1 load_page3 3s
2016-07-21 17:05 user3 load_page5 3s
2016-07-21 17:06 user4 load_page4 3s
2016-07-21 17:06 user1 purchase 3s
2016-07-21 17:10 user4 purchase 3s
2016-07-21 17:10 user1 confirm 3s
2016-07-21 17:11 user4 confirm 3s
2016-07-21 17:11 user1 load_page3 3s

如果我们要计算每个用户使用该应用所花的总的时间。一种最简单的办法就是遍历整个文件,为每个用户加总的 TimeSpent 值。将 UserID 作为键。伪代码可能如:

agg = {}
for line in file
	record = split(line)
	agg[record['UserId']] += record['TimeSpent']
end

print agg	

上面的方法虽然可行,但受限于单个机器的性能。如果日志文件太大(TB甚至更大的PB),上面的处理的耗时将是以“天”来计。 这里,我们能想到的办法是:将日志切分成N份,分配给 N 台不同的机器处理,处理完后再把所有结果整合。

JAVA代码如下:

package hbase;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class TimeSpent {

	public static class Map extends
			Mapper<LongWritable, Text, Text, LongWritable> {

		private static final String splitRE = "\\W+";
		private Text user = new Text();
		private LongWritable time = new LongWritable();

		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] splits = line.split(splitRE);
			if (null == splits || splits.length < 8)
				return;

			user.set(splits[5]);
			time.set(new Long(splits[7].substring(0, splits[7].length() - 1)));
			context.write(user, time);
		}
	}

	public static class Reduce extends
			Reducer<Text, LongWritable, Text, LongWritable> {

		public void reduce(Text key, Iterable<LongWritable> values,
				Context context) throws IOException, InterruptedException {
			long sum = 0;
			for (LongWritable time : values) {
				sum += time.get();
			}
			context.write(key, new LongWritable(sum));
		}
	}

	public static void main(String[] args) throws Exception {

		Path inputPath = new Path(
				"/Users/sunyu/Downloads/listing_user_time.txt");
		Path outputPath = new Path("./out/");

		Configuration conf = new Configuration();
		Job job = new Job(conf, "TimeSpent");
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		job.setMapperClass(Map.class);
		job.setCombinerClass(Reduce.class);
		job.setReducerClass(Reduce.class);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		FileInputFormat.addInputPath(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);

		FileSystem fs = outputPath.getFileSystem(conf);
		if (fs.exists(outputPath)) {
			System.out.println("Deleting output path before proceeding.");
			fs.delete(outputPath, true);
		}

		System.exit(job.waitForCompletion(true) ? 0 : 1);

	}
}

运行结果如下:

16/07/27 22:54:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/07/27 22:54:37 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
16/07/27 22:54:37 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
16/07/27 22:54:37 INFO input.FileInputFormat: Total input paths to process : 1
16/07/27 22:54:37 WARN snappy.LoadSnappy: Snappy native library not loaded
16/07/27 22:54:37 INFO mapred.JobClient: Running job: job_local_0001
16/07/27 22:54:37 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
16/07/27 22:54:37 INFO mapred.MapTask: io.sort.mb = 100
16/07/27 22:54:37 INFO mapred.MapTask: data buffer = 79691776/99614720
16/07/27 22:54:37 INFO mapred.MapTask: record buffer = 262144/327680
16/07/27 22:54:38 INFO mapred.MapTask: Starting flush of map output
16/07/27 22:54:38 INFO mapred.MapTask: Finished spill 0
16/07/27 22:54:38 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
16/07/27 22:54:38 INFO mapred.JobClient:  map 0% reduce 0%
16/07/27 22:54:40 INFO mapred.LocalJobRunner: 
16/07/27 22:54:40 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
16/07/27 22:54:40 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
16/07/27 22:54:40 INFO mapred.LocalJobRunner: 
16/07/27 22:54:40 INFO mapred.Merger: Merging 1 sorted segments
16/07/27 22:54:40 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 66 bytes
16/07/27 22:54:40 INFO mapred.LocalJobRunner: 
16/07/27 22:54:40 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
16/07/27 22:54:40 INFO mapred.LocalJobRunner: 
16/07/27 22:54:40 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
16/07/27 22:54:40 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to out
16/07/27 22:54:41 INFO mapred.JobClient:  map 100% reduce 0%
16/07/27 22:54:43 INFO mapred.LocalJobRunner: reduce > reduce
16/07/27 22:54:43 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
16/07/27 22:54:44 INFO mapred.JobClient:  map 100% reduce 100%
16/07/27 22:54:44 INFO mapred.JobClient: Job complete: job_local_0001
16/07/27 22:54:44 INFO mapred.JobClient: Counters: 17
16/07/27 22:54:44 INFO mapred.JobClient:   File Output Format Counters 
16/07/27 22:54:44 INFO mapred.JobClient:     Bytes Written=46
16/07/27 22:54:44 INFO mapred.JobClient:   FileSystemCounters
16/07/27 22:54:44 INFO mapred.JobClient:     FILE_BYTES_READ=1480
16/07/27 22:54:44 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=64884
16/07/27 22:54:44 INFO mapred.JobClient:   File Input Format Counters 
16/07/27 22:54:44 INFO mapred.JobClient:     Bytes Read=535
16/07/27 22:54:44 INFO mapred.JobClient:   Map-Reduce Framework
16/07/27 22:54:44 INFO mapred.JobClient:     Reduce input groups=4
16/07/27 22:54:44 INFO mapred.JobClient:     Map output materialized bytes=70
16/07/27 22:54:44 INFO mapred.JobClient:     Combine output records=4
16/07/27 22:54:44 INFO mapred.JobClient:     Map input records=13
16/07/27 22:54:44 INFO mapred.JobClient:     Reduce shuffle bytes=0
16/07/27 22:54:44 INFO mapred.JobClient:     Reduce output records=4
16/07/27 22:54:44 INFO mapred.JobClient:     Spilled Records=8
16/07/27 22:54:44 INFO mapred.JobClient:     Map output bytes=182
16/07/27 22:54:44 INFO mapred.JobClient:     Total committed heap usage (bytes)=514850816
16/07/27 22:54:44 INFO mapred.JobClient:     Combine input records=13
16/07/27 22:54:44 INFO mapred.JobClient:     Map output records=13
16/07/27 22:54:44 INFO mapred.JobClient:     SPLIT_RAW_BYTES=114
16/07/27 22:54:44 INFO mapred.JobClient:     Reduce input records=4

运行完后,会在项目根目录下创建一个 out 文件夹,里面会有两个文件:

_SUCCESS
part-r-00000

part-r-00000 里的内容就是结果:

cat part-r-00000
user1	30
user2	2
user3	6
user4	35

分布式 HBase

Hbase是一种搭建在 Hadoop 上面的数据库。所以它才叫 HBase。理论上,HBase 可以运行在任务文件系统上。

region 和 RegionServer

HBase 中的表由行和列族 ,列组成。表中的数据可能达到数十亿行,数百万列。每个表的大小可能达到TB,甚至PB。显然不能在一台机器上存放整个表。表会切分成小一点的数据单位,然后分配到多台服务器上。这些小一点的数据单位就叫 region。管理 region 的服务器就叫 RegionServer。

每个 RegionServer 可以管理多个 region。 在 HBase 配置文件 hbase-site.xml 文件里,可以设置单个 region 的大小(HBase.hregion.max.filesize)。当一个 region 的大小超过该值的时候,它会被切成两个。

表被切成多个 region 时,它会被分配给 RegionServer。分配是没有什么预先设定好的规则的。 当一个 RegionServer 故障或者有新的 RegionServer 加入集群的时候,region 会被重新分配。

找到数据所在 region

对客户端程序来说,如何知道数据在哪个 RegionServer 呢?HBase 有两个特殊的表:-ROOT 和 .META。

-ROOT 不管多大,都不会被切割,它会指向知道答案的 .META region 在哪里。 .META 可能被切割,它记录了 region 在哪个 RegionServer 里。

数据访问顺序是:先找 -ROOT 表,查出 .META 的 region 位置(.META可能会有多个 region),然后再从那个 .META 的 region 里查出要找的 region 在哪个 RegionServer。

找到 -ROOT 表

通过 -ROOT 和 .META可以知道要访问的数据在哪个 RegionServer。那么,-ROOT 在哪个 RegionServer 呢?

HBase 是通过 zookeeper 来维护配置信息的。

当客户端和 HBase 系统交互时,步骤大致如下:

  1. 客户端 —-> Zookeeper -ROOT 在哪里?
  2. Zookeeper —-> 客户端 在 RegionServer RS1 上
  3. 客户端 —-> RS1上的 -ROOT 哪个 .META 的 region 可以帮我找到表T1的行 001
  4. RS1上的 -ROOT —-> 客户端 RS3上的 .META 表对应的 region M2 可以找到
  5. 客户端 —-> RS3 上的 .META表的 regionM2 表T1的行 001 在哪个 RegionServer 上的哪个 region?
  6. RS3 上的 .META表的 regionM2 —-> 客户端 在 RS3 上的 region T1R3 上
  7. 客户端 —-> RS3 的 T1R3 我要读取行 001
  8. RS3 的 T1R3 —-> 客户端 返回数据

HBase + MapReduce

HBase作为数据输入源

前面的示例中,是将本地文件作为数据源。我们也可以将HDFS中的文件作为输入,也可以直接使用HBase表中的数据。

当以 HBase 作为数据输入源时,每个 HBase 表的 region 都会启动一个 map 任务。一个任务读取一个 region。

示例

package hbase;

import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

public class CountShakespeare {

	public static class Map extends TableMapper<Text, LongWritable> {

		public static enum Counters {
			ROWS, SHAKESPEAREAN
		};

		private Random rand;

		/**
		 * Determines if the message pertains to Shakespeare.
		 */
		private boolean containsShakespear(String msg) {
			return rand.nextBoolean();
		}

		@Override
		protected void setup(Context context) {
			rand = new Random(System.currentTimeMillis());
		}

		@Override
		protected void map(ImmutableBytesWritable rowkey, Result result,
				Context context) {
			byte[] b = result.getColumnLatest(TwitsDAO.TWITS_FAM,
					TwitsDAO.TWIT_COL).getValue();
			if (b == null)
				return;

			String msg = Bytes.toString(b);
			if (msg.isEmpty())
				return;

			context.getCounter(Counters.ROWS).increment(1);
			if (containsShakespear(msg))
				context.getCounter(Counters.SHAKESPEAREAN).increment(1);
		}
	}

	public static void main(String[] args) throws Exception {

		Configuration conf = HBaseConfiguration.create();
		conf.set("hbase.zookeeper.quorum", "192.168.192.110");

		Job job = new Job(conf, "TwitBase Shakespeare counter");
		job.setJarByClass(CountShakespeare.class);

		Scan scan = new Scan();
		scan.addColumn(TwitsDAO.TWITS_FAM, TwitsDAO.TWIT_COL);
		TableMapReduceUtil.initTableMapperJob(
				Bytes.toString(TwitsDAO.TABLE_NAME), scan, Map.class,
				ImmutableBytesWritable.class, Result.class, job);

		job.setOutputFormatClass(NullOutputFormat.class);
		job.setNumReduceTasks(0);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

运行结果:

00:26:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
00:26:08 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
00:26:08 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
00:26:11 INFO mapred.JobClient: Running job: job_local_0001
00:26:11 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
00:26:12 INFO mapred.JobClient:  map 0% reduce 0%
00:26:14 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
00:26:17 INFO mapred.LocalJobRunner: 
00:26:17 INFO mapred.LocalJobRunner: 
00:26:17 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
00:26:17 WARN mapred.FileOutputCommitter: Output path is null in cleanup
00:26:18 INFO mapred.JobClient:  map 100% reduce 0%
00:26:18 INFO mapred.JobClient: Job complete: job_local_0001
00:26:18 INFO mapred.JobClient: Counters: 21
00:26:18 INFO mapred.JobClient:   HBase Counters
00:26:18 INFO mapred.JobClient:     REMOTE_RPC_CALLS=10544
00:26:18 INFO mapred.JobClient:     RPC_CALLS=10544
00:26:18 INFO mapred.JobClient:     RPC_RETRIES=0
00:26:18 INFO mapred.JobClient:     NOT_SERVING_REGION_EXCEPTION=0
00:26:18 INFO mapred.JobClient:     MILLIS_BETWEEN_NEXTS=3866
00:26:18 INFO mapred.JobClient:     NUM_SCANNER_RESTARTS=0
00:26:18 INFO mapred.JobClient:     BYTES_IN_RESULTS=1938143
00:26:18 INFO mapred.JobClient:     BYTES_IN_REMOTE_RESULTS=1938143
00:26:18 INFO mapred.JobClient:     REMOTE_RPC_RETRIES=0
00:26:18 INFO mapred.JobClient:     REGIONS_SCANNED=1
00:26:18 INFO mapred.JobClient:   File Output Format Counters 
00:26:18 INFO mapred.JobClient:     Bytes Written=0
00:26:18 INFO mapred.JobClient:   hbase.CountShakespeare$Map$Counters
00:26:18 INFO mapred.JobClient:     ROWS=10541
00:26:18 INFO mapred.JobClient:     SHAKESPEAREAN=5223

HBase作为数据输出源

package hbase;

import java.util.Iterator;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;

import hbase.TwitsDAO;
import hbase.UsersDAO;

public class HamletTagger {

	public static class Map extends TableMapper<ImmutableBytesWritable, Put> {

		public static enum Counters {
			HAMLET_TAGS
		};

		private Random rand;

		private boolean mentionsHamlet(String msg) {
			return rand.nextBoolean();
		}

		protected void setup(Context context) {
			rand = new Random(System.currentTimeMillis());
		}

		protected void map(ImmutableBytesWritable rowkey, Result result,
				Context context) {
			byte[] b = result.getColumnLatest(TwitsDAO.TWITS_FAM,
					TwitsDAO.TWIT_COL).getValue();
			String msg = Bytes.toString(b);
			b = result.getColumnLatest(TwitsDAO.TWITS_FAM, TwitsDAO.USER_COL)
					.getValue();
			String user = Bytes.toString(b);

			if (mentionsHamlet(msg)) {
				Put p = UsersDAO.mkPut(user, UsersDAO.INFO_FAM,
						UsersDAO.HAMLET_COL, Bytes.toBytes(true));
				ImmutableBytesWritable outkey = new ImmutableBytesWritable(
						p.getRow());
				try {
					context.write(outkey, p);
					context.getCounter(Counters.HAMLET_TAGS).increment(1);
				} catch (Exception e) {
					// gulp!
				}
			}
		}
	}

	public static class Reduce extends
			TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable> {

		@Override
		protected void reduce(ImmutableBytesWritable rowkey,
				Iterable<Put> values, Context context) {
			Iterator<Put> i = values.iterator();
			if (i.hasNext()) {
				try {
					context.write(rowkey, i.next());
				} catch (Exception e) {
					// gulp!
				}
			}
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = HBaseConfiguration.create();
		conf.set("hbase.zookeeper.quorum", "192.168.192.110");

		Job job = new Job(conf, "TwitBase Hamlet tagger");
		job.setJarByClass(HamletTagger.class);

		Scan scan = new Scan();
		scan.addColumn(TwitsDAO.TWITS_FAM, TwitsDAO.USER_COL);
		scan.addColumn(TwitsDAO.TWITS_FAM, TwitsDAO.TWIT_COL);
		TableMapReduceUtil.initTableMapperJob(
				Bytes.toString(TwitsDAO.TABLE_NAME), scan, Map.class,
				ImmutableBytesWritable.class, Put.class, job);
		TableMapReduceUtil.initTableReducerJob(
				Bytes.toString(UsersDAO.TABLE_NAME),
				IdentityTableReducer.class, job);

		job.setNumReduceTasks(0);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

运行结果:

00:32:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
00:32:19 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
00:32:20 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
00:32:22 INFO mapred.JobClient: Running job: job_local_0001
00:32:22 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
00:32:23 INFO mapred.JobClient:  map 0% reduce 0%
00:32:26 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
00:32:28 INFO mapred.LocalJobRunner: 
00:32:28 INFO mapred.LocalJobRunner: 
00:32:28 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
00:32:28 WARN mapred.FileOutputCommitter: Output path is null in cleanup
00:32:29 INFO mapred.JobClient:  map 100% reduce 0%
00:32:29 INFO mapred.JobClient: Job complete: job_local_0001
00:32:29 INFO mapred.JobClient: Counters: 20
00:32:29 INFO mapred.JobClient:   HBase Counters
00:32:29 INFO mapred.JobClient:     REMOTE_RPC_CALLS=10544
00:32:29 INFO mapred.JobClient:     RPC_CALLS=10544
00:32:29 INFO mapred.JobClient:     RPC_RETRIES=0
00:32:29 INFO mapred.JobClient:     NOT_SERVING_REGION_EXCEPTION=0
00:32:29 INFO mapred.JobClient:     MILLIS_BETWEEN_NEXTS=3861
00:32:29 INFO mapred.JobClient:     NUM_SCANNER_RESTARTS=0
00:32:29 INFO mapred.JobClient:     BYTES_IN_RESULTS=2634733
00:32:29 INFO mapred.JobClient:     BYTES_IN_REMOTE_RESULTS=2634733
00:32:29 INFO mapred.JobClient:     REMOTE_RPC_RETRIES=0
00:32:29 INFO mapred.JobClient:     REGIONS_SCANNED=1
00:32:29 INFO mapred.JobClient:   File Output Format Counters 
00:32:29 INFO mapred.JobClient:     Bytes Written=0
00:32:29 INFO mapred.JobClient:   File Input Format Counters 
00:32:29 INFO mapred.JobClient:     Bytes Read=0
00:32:29 INFO mapred.JobClient:   hbase.HamletTagger$Map$Counters
00:32:29 INFO mapred.JobClient:     HAMLET_TAGS=5279
00:32:29 INFO mapred.JobClient:   FileSystemCounters
00:32:29 INFO mapred.JobClient:     FILE_BYTES_READ=11963037
00:32:29 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=12111577
00:32:29 INFO mapred.JobClient:   Map-Reduce Framework
00:32:29 INFO mapred.JobClient:     Map input records=10541
00:32:29 INFO mapred.JobClient:     Spilled Records=0
00:32:29 INFO mapred.JobClient:     Total committed heap usage (bytes)=325058560
00:32:29 INFO mapred.JobClient:     SPLIT_RAW_BYTES=65
00:32:29 INFO mapred.JobClient:     Map output records=5279

案例:实时监控系统 openTSDB

本系统用来存储,索引,从大规模计算机系统采集来的监控指标数据,并用图表展示出来。类似百度统计,监控宝此类系统。

该系统存储的是按时序顺序存储的数据。虽然这种系统也可以使用 MySQL 此类关系数据库,但数据量大了后, HBase 的优势就体现出来了。我们不用关心数据分区的问题。而且我们可以使用 MapReduce 高效的处理离线数据。并且将在线实时查询的功能通过过滤器等把逻辑在服务端解决。

如果用 MySQL,假如我们 15 秒检测一次服务器状态;那一天可能有几百万数据;一个月呢?半年呢?

表设计

需要用到两个表。

表 tsdb 提供时间序列数据的存储和查询。就是所有的监控数据。

表 tsdb-uid 表维护全局唯一的UID索引,UID是对应监控指标的标签。如:CPU使用率,硬盘使用率,IO等,tsdb里记录各个标签的数据时,就用这个 uid 来区分。

创建表

通过 shell 脚本创建表:

#!/bin/sh

test -n "$HBASE_HOME" || {
	echo >&2 'HBASE_HOME must be set!'
	exit 1
}
test -d "$HBASE_HOME" || {
	echo >&2 'No such directory: HBASE_HOME=$HBASE_HOME'
	exit 1
}
TSDB_TABLE=${TSDB_TABLE-'tsdb'}
UID_TABLE=${UID_TABLE-'tsdb-uid'}
COMPRESSION=${COMPRESSION-'SNAPPY'}

exec "$HBASE_HOME/bin/hbase" shell <<EOF
create '$UID_TABLE',
{NAME => 'id', COMPRESSION => '$COMPRESSION'},
{NAME => 'name', COMPRESSION => '$COMPRESSION}

create '$TSDB_TABLE',
{NAME => 't', COMPRESSION => '$COMPRESSION}

EOF

上面脚本 创建了拥有列族 id,和 name 的表 tsdb-uid 以及拥有列族 t 的表 tsdb。

通常,每隔 15 秒或 30 秒会有监控程序检测一次状态,并将结果存入 tsdb 中。

表数据模式

tsdb-uid 表

tsdb-uid 用来管理UID。UID固定为 3 个字节宽,作为表 tsdb 外键关联用。 注册一个新的UID,会在表里插入两行数据:一行从标签名映射到UID,另一行从UID映射到标签名。

比如要监控CPU的负载 cpu.upload。

首先要生成一个新的UID(\x00\x00\x01),把UID作为行键,该行的 name 列族存储所有要监控标签的名称(cpu.upload),列限定符用来表示该行存储的是什么值,metrics 表示存储的是监控指标(标签)。

同时要用 cpu.upload 作为行键产生一行,id 列族存储上一步的UID(\x00\x00\x01)。并且还是使用 metrics 作为列限定符,表示它是监控指标(标签)。

数据可能是:

ROW COLUMN+CELL
\x00\x00\x01 column=name:metrics, value=cpu.upload
cpu.upload column=id:metrics, value=\x00\x00\x01
\x00\x00\x02 column=name:metrics, value=disk.io
disk.io column=id:metrics, value=\x00\x00\x02
\x00\x00\x03 column=name:metrics, value=mem.use
mem.use column=id:metrics, value=\x00\x00\x03

UID映射标签的记录可以用来表示UID是哪种监控,因为记录监控信息的时候都是用UID作为监控数据的行键。

标签映射UID的数据可以用来类似 autocomplate 的自动匹配功能。我们在监控结果页面查询某个监控指标时,比如 cpu.upload,当我们在输入框中输入 cpu 时,就可以从该表中查询出所有以 cpu 开头的记录,方便我们选择。它是通过限定行键的范围扫描实现的。

除了存储监控指标外,还可以存储一些其它的信息,如:主机名;把被监控的所有主机都录入到该表中。数据如下:

ROWCOLUMN+CELL
\x00\x01\x01 column=name:host, value=web-1
web-1 column=id:host, value=\x00\x01\x01
\x00\x01\x02 column=name:host, value=web-2
web-2 column=id:host, value=\x00\x01\x02
\x01\x01\x01 column=name:type, value=host
host column=id:type, value=\x01\x01\x01

注意,这里的UID是 \x00\x01 打头的,列族是 host;前面的UID是 \x00\x00 打头,列族是 metrics。

同理,我们可以把其它系统常量及扩展信息都存在该表中。

tsdb 表

tsdb 表是核心数据。存储时间序列上的监控结果。通常的查询方式是按日期范围和标签进行查询。这时候我们就可以精心设计一下行键,因为hbase里数据会按行键排序。行键设计可能如下:

监控指标UID 部分时间戳 标签1名称UID 标签1值UID
3字节 4字节 3字节 3字节

可以看到,tsdb-uid 表里的UID也被拿来当作 tsdb行键的一部分。

因为通常不会一下查询多个监控项(标签)的数据,所以把监控指标UID作为行键的最开头部分。

由于 HBase 是按行键顺序存储数据的,所以我们可以把行键里的时间戳四舍五入到 60 分钟。那么,这 60 分钟里,这个标签所有的监控数据的时间戳都是一样的,于是这些数据行键都一样,就都在一行显示。而各个具体的监控时间点的值,我们再用列限定符去区分

该表结果存储的时候,只有一个列族 t。列限定符,由时间和掩码组合而成。由于前面的行键中的时间戳只计算到了小时的时间,这里则存的是除小时外的精确时间。

假设现在有一条数据,是检测CPU负载(\x00\x00\x01),值是 25,检测的时间是 2016-07-28 07:00:00(unix值是 1469660400,它会被切割成 1469660000 和 400 ),检测的机器的 host (\x01\x01\x01)是 web-1(\x00\x01\x01)。

则它的数据应该如下: 行键:\x00\x00\x01 + 1469660000 + \x01\x01\x01 + \x00\x01\x01

列限定符:400 + 0x07 值:25

这里是从 bit 级别上去构思的,主要是为了优化性能。 每行存储多个观测值,是为了让带过滤器的扫描一次过滤就能滤掉更多的数据。这也会大大减少基于 bloomfilter 需要跟踪的行数。

存储数据

tsdb-uid

在业务开展前,我们要把系统常量都整理好,存入 tsdb-uid 中。如:要监控的指标,标签(如: host),标签值(如: web-1)。而且,每次插入的时候要判断当前是否已经有这个UID。

tsdb

每次存储时,重要的是生成行键。

行键在HBase存储时是一个二进制码流,Rowkey的长度被很多开发者建议说设计在10~100个字节,不过建议是越短越好,不要超过16个字节。

目前操作系统是都是64位系统,内存8字节对齐。控制在16个字节,8字节的整数倍利用操作系统的最佳特性。

查询数据

系统查询功能主要为了满足如下几个需求:

  1. 标签自动补全 使用行键扫描功能来完成此功能,比如用户输入 web,我们就可以以此为扫描的起始键,查找以这个为前缀的条目,一直到 wez结束。

  2. 查询某监控指标的时间序列数据 从监控数据表里查数据,也使用行键过滤功能,不过规则要复杂一些,因为行键是由多个值组合而成的:监控指标UID 部分时间戳 标签1名称UID 标签1值UID 象这种过滤器要在服务器端使用,而不是在客户端。因为这样可以减少每次请求时传递给客户端的数据量。

案例:广告实时计算系统

互联网广告,讲究的是实时。而它的数据量则是海量的,特别是展现、点击数据非常多。所以传统的关系型数据库是无法满足业务需要的。我们要处理的数据主要有:

  1. 展现数据。如:用户ID、广告ID、IP、时间等。
  2. 点击数据。
  3. 行为数据。如:通过广告位获得的用户下载、安装、或交易等后续的数据。
  4. 第三方监控数据。为了体现数据的公正,通常会接入第三方的统计监控数据。

主要功能

记录广告的展现、点击数据,并根据历史数据制定广告投放策略。

提供某广告在两个月内每天展现量的统计,并且可以分省份、地市、用户三个维度统计。

系统架构

该系统分为如下几层:

  1. 日志接收层。nginx + flume + kafka 将日志转换成实时数据流。
  2. 数据处理层。storm 从 kafka 读取数据流,并进行实时处理。
  3. 存储层。使用 HBase 进行数据存储。

表设计

广告展现数据表

由于广告要分省、市、用户分别进行查询。所以这里行键需要三个。整体设计如下:

结构
表名 adpv_stat
行键 [AD_ID]_[省ID]_[日期]
[AD_ID]_[市ID]_[日期]
[AD_ID]_[用户ID]_[日期]
列族 pv
列名 cnt

测试数据可能如下:

2211_ d930807e48a46653a72ccba6f5290bb1-2016-09-01

广告点击数据表

结构
表名 adclick_stat
行键 [AD_ID]_[日期]
列族 clk
列名 cnt

集群部署

// TODO

运维

// TODO


Similar Posts

上一篇 Docker 实战

下一篇 HBase-PHP

评论