孙宇的技术专栏 大数据/机器学习

Hadoop

2018-11-17

阅读:


Hadoop

提供可靠的且可扩展的存储和分析平台。 Hadoop不止是 HDFS 和 MapReduce ,它涉及到了大数据相关的一整个生态。

HDFS

传统的关系型数据库系统将数据存在操作系统的硬盘里,但有个硬伤就是硬盘寻址时间的提升远远赶不上传输速率的提升。这就是访问延迟的主要原因。将所有数据存在一个大硬盘里读取时要更多的时间。所以一种解决方案是将数据分散到多个小硬盘中,读取时同时读取。

数据存储在多个硬盘中首先要解决的就是硬件故障问题,为了保证高可用,最常见的作法就是“冗余”,系统内有多个数据复本。冗余硬盘阵列(RAID)就是这个原理。HDFS 也如此不过方式上有区别。

RDBMS通常用BTree的形式存储数据,这样在寻址会少很多,速度就会快很多。在少量更新数据时,性能没问题;但如果大量更新就很慢,因为数据要重新排序,合并。

RDBMS适合于索引后数据集的查询和更新,通过索引减少数据寻址。

MapReduce

基于一个批处理系统,但不适合交互式分析。通常用来离线数据分析。它提出一个编程模型,抽象出这些硬盘读/写问题并将其转换为对一个数据集(由键/值对组成)的计算,由 map 和 reduce 两部分组成。

HBase

提供在线访问的功能,使用 HDFS 作为底层数据存储。

YARN

集群资源管理系统,Yet Another Resource Negotiator,允许任何一个分布式程序基于 Hadoop 集群的数据而运行。

流处理

Storm, Spark 等使用流式输入并进行计算,并向 Hadoop 存储系统或外部系统发布结果。

搜索

Solr, ElasticSearch 可以基于 HDFS 提供搜索服务。

迭代处理

许多算法,如机器学习算法,需要将之前的计算结果作为依据,即提供迭代处理功能。MapReduce 不支持这种计算,Spark 就可以实现。

##发行版本

Apache版本

完全免费的版本,文档资料多。但版本多,管理复杂,部署安装等复杂。在 Hadoop 生态圈里和其它组件合作时要考虑版本兼容问题。

企业实际使用并不多。

第三方开源版本

Cloudera发行版(CDH)

基于Apache协议,100%开源。版本管理清晰。提供了部署、安装、配置工具,大大提高了集群部署的效率,可以在几个小时内部署好集群,并对集群的节点及服务进行实时监控。

目前使用最为广泛。

Hortonworks发行版(HDP) 第一家使用了Apache HCatalog的元数据服务特性的提供商。并且,它们的Stinger开创性地极大地优化了Hive项目。Hortonworks为入门提供了一个非常好的,易于使用的沙盒。

MapR 重写了HDFS。可以使用本地Unix命令来代替Hadoop命令。

安装

Hadoop 必须先安装JAVA

JAVA 安装

tar -zxf jdk-7u45-linux-x64.gz
mv jdk1.7.0_45 /usr/local/jdk1.7

修改 /etc/profile ,在底部添加 JAVA 相关设置,如下:

export JAVA_HOME=/usr/local/jdk1.7
export CLASSPATH=$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:.
export PATH=$JAVA_HOME/bin:$PATH

查看安装情况:

java -version

java version "1.7.0_45"
Java(TM) SE Runtime Environment (build 1.7.0_45-b18)
Java HotSpot(TM) 64-Bit Server VM (build 24.45-b08, mixed mode)

Hadoop 安装

http://www.apache.org/dyn/closer.cgi/hadoop/common/
http://mirror.bit.edu.cn/apache/hadoop/common/stable/hadoop-2.9.1.tar.gz
tar -zxf hadoop-2.9.1.tar.gz
mv hadoop-2.9.1 /usr/local/hadoop2.9.1

修改 /etc/profile ,在底部添加 hadoop 相关设置,如下:

export JAVA_HOME=/usr/local/jdk1.7
export HADOOP_HOME=/usr/local/hadoop2.9.1
export CLASSPATH=$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:.
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$JAVA_HOME/bin:$PATH

查看安装情况:

hadoop version

Hadoop 2.9.1
Subversion https://github.com/apache/hadoop.git -r e30710aea4e6e55e69372929106cf119af06fd0e
Compiled by root on 2018-04-16T09:33Z
Compiled with protoc 2.5.0
From source with checksum 7d6d2b655115c6cc336d662cc2b919bd
This command was run using /usr/local/hadoop2.9.1/share/hadoop/common/hadoop-common-2.9.1.jar

免密登录设定

单机调试,所有进程都运行在同一节点上,但进程之间还是和在集群中一样,通过SSH进行通信。所以,这里需要能通过SSH不输密码直接登录到 localhost 上。

[root@localhost www]# ssh-keygen -t rsa
Generating public/private rsa key pair.
Enter file in which to save the key (/root/.ssh/id_rsa): 回车
Enter passphrase (empty for no passphrase): 回车
Enter same passphrase again: 回车
Your identification has been saved in /root/.ssh/id_rsa.
Your public key has been saved in /root/.ssh/id_rsa.pub.
The key fingerprint is:
11:00:39:f3:8b:d7:19:47:1e:67:06:7e:cd:9f:a0:4f root@localhost

这样就生成了SSH登录的公钥和密钥。需要将公钥添加到要远程免密登录的机器的 ~/.ssh/authorized_keys

[root@localhost sbin]# cd ~/.ssh/
[root@localhost .ssh]# cat id_rsa.pub >> authorized_keys

这时可以直接免密码SSH登录本机了:

[root@localhost .ssh]# ssh root@localhost

关闭防火墙

systemctl disable firewalld.service
systemctl stop firewalld.service

分布式安装

默认安装的时候是单机安装,可以用来调试。如果想在本机尝试集群环境,可以使用伪分布式模式。线上真实多机集群则用全分布式模式。

伪分布式

在伪分布式模式下,要创建配置文件并将其放在hadooop安装目录的 etc/hadoop 目录下。也可以放在其它目录,然后通过设置 HADOOP_CONF_DIR 环境变量来实现配置文件自定义放置。配置文件内容如下:

core-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
	<property>
			<name>fs.defaultFS</name>
			<value>hdfs://localhost:9000</value>
			<description>NameNode</description>
	</property>
	<property>
    		<name>hadoop.tmp.dir</name>
    		<value>/usr/local/hadoop2.9.1/data</value>
  	</property>
</configuration>

注意,这里配置了HDFS的数据目录,要先在本地文件系统中创建该目录:mkdir -p /usr/local/hadoop2.9.1/data。 默认的配置里是没有这一项的,如果不配置该项,系统会在 /tmp 目录下创建临时数据目录,但关机再开机时该目录会变动而找不到地址,导致 namemode 无法启动。直观看就是 jps 里没有 namemode,以及监听端口里没有 9000 和 50070。如果发现启动有问题,可以查看安装目录下的 logs 目录查看

mapred-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
	<property>
		<name>mapreduce.framework.name</name>
		<value>yarn</value>
	</property>
	<property>  
        <name>mapreduce.jobhistory.address</name>  
        <value>0.0.0.0:10020</value>  
	</property>
</configuration>

yarn-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
	<property>
		<name>yarn.resourcemanager.hostname</name>
		<value>0.0.0.0</value>
	</property>
	<property>
		<name>yarn.nodemanager.aux-services</name>
		<value>mapreduce_shuffle</value>
	</property>
</configuration>

hdfs-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
	<property>
		<name>dfs.replication</name>
		<value>1</value>
		<description>cnt</description>
	</property>
</configuration>

配置完后,先格式化,然后在 hadoop 的安装目录下的 sbin 目录内启动 hadoop:

hdfs namenode -format
cd /usr/local/hadoop2.9.1/sbin
start-dfs.sh
start-yarn.sh

完成后它会启动诸如 namenode, datanode, SecondaryNameNode 等组件,可以通过 jps 命令查看:

[root@localhost sbin]# jps
6738 ResourceManager
6404 DataNode
7032 Jps
6577 SecondaryNameNode
6267 NameNode
6855 NodeManager

这时候可以查看当前正在监听的端口,发现:

[root@localhost sbin]# netstat -openlut
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       User       Inode      PID/Program name     Timer
tcp        0      0 0.0.0.0:50020           0.0.0.0:*               LISTEN      0          49246      6404/java            off (0.00/0/0)
tcp        0      0 127.0.0.1:9000          0.0.0.0:*               LISTEN      0          47962      6267/java            off (0.00/0/0)
tcp        0      0 0.0.0.0:50090           0.0.0.0:*               LISTEN      0          50280      6577/java            off (0.00/0/0)
tcp        0      0 0.0.0.0:50070           0.0.0.0:*               LISTEN      0          47955      6267/java            off (0.00/0/0)
tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      0          15627      1135/sshd            off (0.00/0/0)
tcp        0      0 127.0.0.1:25            0.0.0.0:*               LISTEN      0          16691      1545/master          off (0.00/0/0)
tcp        0      0 0.0.0.0:50010           0.0.0.0:*               LISTEN      0          48972      6404/java            off (0.00/0/0)
tcp        0      0 0.0.0.0:50075           0.0.0.0:*               LISTEN      0          49243      6404/java            off (0.00/0/0)
tcp        0      0 127.0.0.1:55740         0.0.0.0:*               LISTEN      0          48978      6404/java            off (0.00/0/0)
tcp6       0      0 127.0.0.1:8033          :::*                    LISTEN      0          55050      6738/java            off (0.00/0/0)
tcp6       0      0 :::8040                 :::*                    LISTEN      0          54139      6855/java            off (0.00/0/0)
tcp6       0      0 :::8042                 :::*                    LISTEN      0          55047      6855/java            off (0.00/0/0)
tcp6       0      0 :::22                   :::*                    LISTEN      0          15629      1135/sshd            off (0.00/0/0)
tcp6       0      0 127.0.0.1:8088          :::*                    LISTEN      0          54137      6738/java            off (0.00/0/0)
tcp6       0      0 :::40377                :::*                    LISTEN      0          54131      6855/java            off (0.00/0/0)
tcp6       0      0 ::1:25                  :::*                    LISTEN      0          16692      1545/master          off (0.00/0/0)
tcp6       0      0 :::13562                :::*                    LISTEN      0          55046      6855/java            off (0.00/0/0)
tcp6       0      0 127.0.0.1:8030          :::*                    LISTEN      0          51406      6738/java            off (0.00/0/0)
tcp6       0      0 127.0.0.1:8031          :::*                    LISTEN      0          51384      6738/java            off (0.00/0/0)
tcp6       0      0 127.0.0.1:8032          :::*                    LISTEN      0          51956      6738/java            off (0.00/0/0)
udp        0      0 0.0.0.0:68              0.0.0.0:*                           0          21329      3830/dhclient        off (0.00/0/0)
udp        0      0 0.0.0.0:46650           0.0.0.0:*                           0          21256      3830/dhclient        off (0.00/0/0)
udp6       0      0 :::43191                :::*                                0          21257      3830/dhclient        off (0.00/0/0)

其中,50070 是hadoop管理端的端口;8088是资源管理器端口。可以通过浏览器进行访问。

http://192.168.111.111:50070/
http://192.168.111.111:8088/

测试数据

示例数据:

https://github.com/tomwhite/hadoop-book/tree/master/input/ncdc/all

这里只有两年的数据 1901.gz 和 1902.gz。

完整数据:

#!/bin/bash
for i in {1901..2018}
do
cd /data/soft/ncdc
wget --execute robots=off -r -np -nH --cut-dirs=4 -R index.html* ftp://ftp.ncdc.noaa.gov/pub/data/noaa/$i/
done

有许多个压缩文件,如 029070-99999-1901.gz。分别记录了各年的一些温度数据。

数据上传HDFS

将完整数据各个包先解压,然后把文件内容合并到一个大文本内,再把这一个大文件gzip压缩,并上传到HDFS 里。

#!/bin/bash

# 处理全部数据
for file in ncdc/*.gz
do
  echo "$file"
  gunzip -c $file >> /home/www/bigdata/target/all.txt
done

# -put 后的 - 表示从标准输出中读取,也就是接受前面压缩后通过管道传过来的数据
# gzip /home/www/bigdata/target/all.txt | hadoop fs -put - /home/hdfs/
# 也可以分成两步:
gzip /home/www/bigdata/target/all.txt
hadoop fs -put /home/www/bigdata/target/all.txt.gz /home/hdfs/

这里产生的文件将非常大,十几个G。可以少处理一些测试数据:

#!/bin/bash

# 只处理 4 年的数据
for file in `ls ncdc | grep -E '1901.gz|1902.gz|1903.gz|1904.gz'`
do
  echo "$file"
  gunzip -c ncdc/$file >> /home/www/bigdata/target/fouryear.txt
done

# -put 后的 - 表示从标准输出中读取,也就是接受前面压缩后通过管道传过来的数据
# gzip /home/www/bigdata/target/fouryear.txt | hadoop fs -put - /home/hdfs/
# 也可以分成两步:
gzip /home/www/bigdata/target/fouryear.txt
hadoop fs -put /home/www/bigdata/target/fouryear.txt.gz /home/hdfs/

只有 4 年的数据,最后的未压缩文件也有 2.5G。

运行理解

传统方式中,通常用 shell 命令,awk 之类命令进行数据、日志分析。但如果文件特别大,数据量多,分析过程会很慢。比如在一些打包好的记录每年温度数据的文件中找到每一天的最高温度数据。shell 代码可能如下:

#!/usr/bin/env bash

for year in all/*
do
    echo -ne `basename $year .gz`"\t"
    gunzip -c $year | \
        awk '{ temp = substr($0, 88, 5) + 0;
            q = substr($0, 93, 1);
            if ( temp != 9999 && q ~/[01459]/ && temp > max) max = temp}
            END { print max }
            '
done

将前面的测试数据放在该代码同级目录下的 all 目录下。并执行脚本:

[root@localhost hadoop]# ./temp.sh
1901	317
1902	244

可以看到计算的结果是,1901 年的最高温度是 317,1902 年是 244。这里的温度是实际温度的 10 倍。

如果数据量很大,我们可以尝试将一年的数据放在一台机器上。这样将任务进行切割,或者用其它的方式切割。但切割带来的问题是分布要均匀,要不然就无法利用所有机器的性能。

Hadoop 框架就可以解决这种困扰。它采用 MapReduce 模式,先将数据按我们定义的 Map 方式分配给多台机器进行处理,各机器得到数据后进行逻辑处理,然后结果再汇总。整个过程中,数据是用键值对的形式传递。

创建项目

创建一个 maven 项目,pom.xml 内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>sunyu</groupId>
    <artifactId>hadoop</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <!-- Component versions are defined here -->
        <hadoop.version>2.9.1</hadoop.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

    </dependencies>

</project>

Map

在项目目录 src/main/java/ 下创建类 MaxTempMapper, 内容如下:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTempMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private static final int MISSING = 9999;

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 读取输入内容
        String line = value.toString();
        // 从内容中获取当前的年份
        String year = line.substring(15, 19);
        int airTemperature;
        // 读取当前记录对应的温度
        if (line.charAt(87) == '+') {
            airTemperature = Integer.parseInt(line.substring(88, 92));
        } else {
            airTemperature = Integer.parseInt(line.substring(87, 92));
        }
        String quality = line.substring(92, 93);
        // 输出年份和温度
        if (airTemperature != MISSING && quality.matches("[01459]")) {
            context.write(new Text(year), new IntWritable(airTemperature));
        }
    }
}

Reduce

在项目目录 src/main/java/ 下创建类 MaxTempReduce, 内容如下:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;

import java.io.IOException;

public class MaxTempReduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int maxValue = Integer.MIN_VALUE;
        // 当前得到的输入是年份对应的温度列表,需要从中找到当前年份的最大温度
        for (IntWritable value : values) {
            maxValue = Math.max(maxValue, value.get());
        }
        context.write(key, new IntWritable(maxValue));
    }
}

Job定义

在项目目录 src/main/java/ 下创建类 MaxTemp, 内容如下:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemp {
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("用法: MaxTemp <输入路径> <输出路径>");
            System.exit(-1);
        }

        // 定义任务各参数
        Job job = new Job();
        job.setJarByClass(MaxTempMapper.class);
        job.setJobName("Max Temp");

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MaxTempMapper.class);
        job.setReducerClass(MaxTempReduce.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

打包

在项目目录里执行命令:

mvn package

命令执行完后,在项目目录下的 target 目录里会有一个 hadoop-1.0-SNAPSHOT.jar 文件。

运行

在项目目录下执行:

hadoop jar target/hadoop-1.0-SNAPSHOT.jar MaxTemp all/sample.txt output

运行完成后会在当前目录下创建一个 output 目录 ,目录里有文件part-r-00000,打开文件,内容是:

1901	317
1902	244

可以看到,这里的结果和前面用 shell 计算的结果一致。目前这个示例没有用到HDFS,也没有用到hadoop集群。如果数据量很大,就需要了。

HDFS

Hadoop 提供了一个分布式的文件存储系统,就叫 HDFS。它可以存储超大文件,如几百G,几百T,甚至PB级的。这些在其它的文件系统是做不到的。

HDFS的构建思路是:一次写入、多次读取。数据存储后,会多次进行该数据的各种分析。每次分析都涉及到该数据集的大部分甚至是全部。所以,读取整个数据的时间延迟比第一条记录寻址时间延迟更重要。另外,如果需要低延迟的数据访问,HDFS是不适合的。它是为大数据的高吞量而设计的。如果要低延迟访问,可以尝试 Hbase。

Hadoop 不需要运行在昂贵的三件上。用在一些低配置的多台机器上就行。各台机器将文件系统的元数据存储在内存中(元数据不是文件内容本身)。因此,文件系统能存储的数据总量就受到各机器内存的限制。每个文件、目录、数据块的元信息大约占 150 字节。所以,如果有 100 万个文件,且每个文件占一个数据块,那么至少需要 300MB的内存,依次类推。所以,最好是存储单个比较大的文件。

数据块

数据块是磁盘进行数据读写的最小单位。构建于单个磁盘之上的文件系统通过磁盘块来管理文件系统中的块,该文件系统块的大小可以是磁盘块的整数倍。文件系统块一般为几千字节,磁盘块一般为 512字节。

HDFS 的块要大得多,默认是 128MB。设计成这么大的块,目标就是为了最小化寻址开销。但设置的太小也不好,因为 MapReduce 的 map 任务通常一次只处理一个块中的数据,如果设计的太少(少于集群中节点数量),作业的运行速度就会比较慢。

和单个磁盘文件系统一样,HDFS 上的文件被划分为块大小的多个分块,作为独立的存储单元。不同的是,HDFS里如果文件大小小于块大小,它不会占用整个块空间,即:1MB的文件不会占用整个 128M 的块。

分块存储的设计方便存储和管理,文件的元数据单独存储,如权限等。另外,块的存储形式就可以实现冗余存储,进而提供高可用性。

namenode, datanode

HDFS 集群有两类节点,一个是管理节点 namenode,一个是工作节点 datanode。

namenode 管理文件系统的命名空间。它维护文件系统树及所有的文件和目录。这些信息以两个文件形式永久保存在本地磁盘上:命名空间镜像文件和编辑日志文件。它也记录着每个文件中各个块所在的数据节点的信息,但它不永久保存这些信息,因为这些信息会在系统启动时根据数据节点信息重建。

datanode 是文件系统的工作节点。它们根据需要存储并检索的数据块,并定期向 namenode 发送它们所存储的块的列表。

没有 namenode,文件系统将无法使用。如果运行 namenode 的机器毁坏,文件系统上所有的文件将会丢失,因为我们不知道如何根据 datanode 的块重建文件。

块缓存

通常,datanode 从磁盘中寻址并读取块,但对于访问频繁的文件,其对应的块可能被显式的缓存在 datanode 的内存中。默认一个块仅缓存在一个 datanode 的内存中。MapReduce, Spark 就会利用缓存提高读操作的性能。

高可用性

在多个文件系统中冗余 namenode 元数据以及备用 namenode 可以防止数据丢失。但还是会有单点失效问题。这时候就要有主从结构或者说是冷热结构。当 namenode 失效时,备用 namenode 接管任务。

这样就需要实现备机和主机的数据同步,并且 datanode 每次要向两个 namenode 发送数据块处理报告。因为数据块的相关信息是存储在 namenode 的内存中,而非磁盘。

命令行基本文件操作

安装完毕 hadoop 后,启动 hadoop, 首先要输入一个命令来格式化HDFS:

hdfs namenode -format

然后启动:

cd /usr/local/hadoop2.9.1/sbin
start-dfs.sh
start-yarn.sh

创建目录

hadoop fs -mkdir -p /home/hdfs

查看目录

hadoop fs -ls /
Found 1 items
drwxr-xr-x   - root supergroup          0 2018-11-18 15:49 /home

hadoop fs -ls -R /
drwxr-xr-x   - root supergroup          0 2018-11-18 15:49 /home
drwxr-xr-x   - root supergroup          0 2018-11-18 15:49 /home/hdfs

上传文件

hadoop fs -put NOTICE.txt /home/hdfs/
hadoop fs -ls /home/hdfs
Found 1 items
-rw-r--r--   1 root supergroup      15915 2018-11-18 15:53 /home/hdfs/NOTICE.txt

查看文件

get 命令和 put 相反,是将 HDFS 中的文件复制到本地文件系统。

hadoop fs -get /home/hdfs/NOTICE.txt /data/soft/

删除文件

hadoop fs -rm /home/hdfs/NOTICE.txt

删除目录

hadoop fs -rm -r /home/hdfs

JAVA版本基本文件操作

创建项目

新建 maven 项目 hadoop,pom.xml 内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>sunyu</groupId>
    <artifactId>hadoop</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <!-- Component versions are defined here -->
        <hadoop.version>2.9.1</hadoop.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

    </dependencies>

</project>

创建 java 代码,在项目根目录下的 src/main/java/filesystem/ 目录下创建

RegexExcludePathFilter.java:

package filesystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;

public class RegexExcludePathFilter implements PathFilter {
  
  private final String regex;

  public RegexExcludePathFilter(String regex) {
    this.regex = regex;
  }

  public boolean accept(Path path) {
    return !path.toString().matches(regex);
  }
}

HadoopFileOper.java:

package filesystem;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URL;

public class HadoopFileOper {
    static {
        // java 程序无法识别 hdfs 的路径,需要将 url 处理的工厂设置一下
        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
    }

    /**
     * 通过 URL 形式读取文件.这种方式必须设置 UrlStreamHandlerFactory
     *
     * @throws Exception
     */
    public static void catFileByUrl() throws Exception {
        InputStream in = null;
        try {
            in = new URL("hdfs://localhost:9000/home/hdfs/NOTICE.txt").openStream();
            // 在输入流和输出流间复制数据
            IOUtils.copyBytes(in, System.out, 4096, false);
        } finally {
            // 关闭数据流
            IOUtils.closeStream(in);
        }
    }

    /**
     * 通过 FileSystem 读取 hdfs 中的文件
     *
     * @throws Exception
     */
    public static void catFileByFileSystem() throws Exception {
        // 读取配置文件中的内容,如 core-site.xml 中的配置
        Configuration conf = new Configuration();
        String uri = "hdfs://localhost:9000/home/hdfs/NOTICE.txt";
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        InputStream in = null;
        try {
            in = fs.open(new Path(uri));
            IOUtils.copyBytes(in, System.out, 4096, false);
        } finally {
            IOUtils.closeStream(in);
        }
    }

    /**
     * 从本地文件系统上传文件到 hdfs
     * 上传文件时会自动创建目录
     *
     * @throws Exception
     */
    public static void uploadFile() throws Exception {
        String localSrc = "/home/www/bigdata/hadoop/test.txt";
        String dst = "hdfs://localhost:9000/home/hdfs/test.txt";

        InputStream in = new BufferedInputStream(new FileInputStream(localSrc));

        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(dst), conf);
        OutputStream out = fs.create(new Path(dst), new Progressable() {
            public void progress() {
                System.out.print(".");
            }
        });

        IOUtils.copyBytes(in, out, 4096, true);
    }

    /**
     * 在 hdfs 上删除文件
     *
     * @throws Exception
     */
    public static void deleteFile() throws Exception {
        String toDel = "hdfs://localhost:9000/home/hdfs/test.txt";
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(toDel), conf);
        Path f = new Path(toDel);
        // 如果第一个参数是文件或空目录,第二个参数会被忽略
        // 如果第一个参数是非空的目录,且第二个值是 true,目录及其内容才会被删除
        boolean delete = fs.delete(f, true);
    }

    /**
     * 查看文件元数据
     *
     * @throws Exception
     */
    public static void listStatus() throws Exception {
        String uri = "hdfs://localhost:9000/home/hdfs/NOTICE.txt";
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);

        Path[] paths = new Path[1];
        paths[0] = new Path(uri);

        FileStatus[] status = fs.listStatus(paths);

        for (FileStatus stat : status) {
            System.out.printf(stat.getPath().toUri().getPath());
        }

        Path[] listedPaths = FileUtil.stat2Paths(status);
        for (Path p : listedPaths) {
            System.out.println(p);
        }
    }

    /**
     * 用表达式查找文件
     *
     * @throws Exception
     */
    public static void searchFile() throws Exception {
        String uri = "hdfs://localhost:9000/home/";
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        // 从第一个参数的路径中找文件,第二个参数是要过滤掉的文件模式
        FileStatus[] lists = fs.globStatus(new Path("/home/hdfs/*"), new RegexExcludePathFilter("^.*.doc$"));

        for (FileStatus item : lists) {
            System.out.printf(item.getPath().toUri().getPath());
        }
    }

    public static void main(String[] args) throws Exception {
//        catFileByUrl();
//        catFileByFileSystem();
//        uploadFile();
//        deleteFile();
//        listStatus();
        searchFile();
    }
}

常用的一些操作都在文件中。要测试时要先打包项目,然后在 hadoop 中执行

打包项目

使用 mvn 命令打包,在命令行中进入项目根目录,并执行:

mvn package

执行完毕后,在项目的 target 目录下会有一个 hadoop-1.0-SNAPSHOT.jar 文件。这就是打包后的项目文件。

测试

在命令行中进入项目根目录,并执行:

hadoop jar target/hadoop-1.0-SNAPSHOT.jar filesystem/HadoopFileOper

运行完后可以看到相应的结果。命令里第一个参数是项目打包后文件的相对路径,第二个参数是要执行的类路径。由于前面测试时将类放在了 src/main/java/filesystem 下,所以这里写的是 filesystem/HadoopFileOper。

原理分析

读文件

在JAVA版代码中,客户端调用了 FileSystem 类的 open 方法,打开希望读取的文件。然后通过 RPC 调用 namenode,确定文件的起始块的位置。然后到 datanode中读取对应的块。在分布式环境中,会从最近的 datanode 里获取数据。如果某个 datanode有故障,它会记住并在后续处理中不再从那里读取。

写文件

文件系统调用 create 方法创建文件,它会对 namenode 创建一个RPC调用,在文件系统的全名空间中新建一个文件,这时该文件中还没有相应的数据块。namenode 会执行不同的检查以确保这个文件不存在 以及客户端有新建该文件的权限。检查通过后 namenode会为该文件记录一条记录,否则会抛出一个IOException 异常。

写入数据时,数据流将文件分成一个个的数据包,并写入内部数据队列。系统会从 datanode 中挑一组出来进行数据的存储,并要求 namenode 分配新的数据块。如果要存在多个 datanode,它是先传到第一个,存储完后第一个 datanode 把数据包发往第二个,依此类推。所有节点都存储完成后会从队列中删除数据包。

如果写入时发生故障,会停止向 datanode 的传输,然后把数据队列中的所有数据包都添加回数据队列的最前端,保证故障节点下流的 datanode 不会漏掉任何一个数据包。并且为正常存储的 datanode 进行标识。方便故障恢复后的处理。

如果写入时多个 datanode 同时故障,只需要成功写入 dfs.namenode.replication.min 个就会认为是成功。

客户端完成写入后,对数据流调用 close() 方法。该操作将剩余的所有数据写入 datanode 管线,并联系 namenode,告知其文件写入完成,等待确认。namemode 已经知道文件由哪些块组成,所以它在返回成功前只需要等待数据块进行最小量的复制。

写入的一致性 文件写入后,它能在文件系统的全名空间中立即可见,但内容不一定。即使数据流已经刷新并存储,可能文件内容长度还是 0.因为当写入的数据超过 1 个块后,第一个数据块对新的 reader 是可见的,之后的则不是。正在写入的块对其他 reader 是不可见的。

HDFS 提供一种强行刷新数据到 datanode 中的手段,即调用 FSDataOutputStream 的 hflush()方法。它是往各 datanode 的内存中刷新,不保证存储到了硬盘上,如果此时断电等,数据可能丢失。如果要保证刷新到硬盘,可以使用 hsync() 方法。HDFS的关闭文件操作其实就隐含了 hflush() 方法。在实际应用中可以考虑安全性,选择适合的刷新方式。

并行复制

使用 cp 命令,可以在命令行中对 hdfs 中的文件进行复制。但如果是大量文件,这种方式效率太低。需要用到 distcp 这种并行处理的方式。如:

// 复制文件
hadoop distcp file1 file2
// 复制目录,如果 dir2 不存在,会新建 dir2
// 如果 dir 存在,dir1 的内容会复制到 dir2 下,变为:dir2/dir1
// 如果只是想复制内容,可以使用 -overwrite 选项
hadoop distcp dir1 dir2

distcp 是作为一个 MapReduce 作业实现的。实际上这个任务只有 map,没有 reduce。每个文件通过一个 map 任务进行复制。并且为每一个 map 分配大致相等的数据(将文件划分为大致相等的块)。默认有 20 个 map 被使用,也可以通过 distcp 的 -m 参数进行设定。在集群中可以通过这上参数来配置均衡

还可以用 distcp 命令在两个HDFS集群间传送数据,如:

// -delete 会删除目标路径中和源路径不同的内容
// -p 会保留源文件的权限,块大小和复本数
hadoop distcp -update -delete -p hdfs://host1/dir hdfs://host2/dir2

如果存在两处HDFS版本不一致,可以用 webhdfs 协议:

hadoop distcp -update -delete -p webhdfs://host1:50070/dir webhdfs://host2:50070/dir2

Hadoop I/O

hadoop 提供一套原子化的操作。方便进行处理,特别是大数据量的操作。

数据校验

在数据量非常大(TB级别)时,处理过程中很可能出现数据损坏。Hadoop 引入了检验机制。在数据第一次引入系统时会计算校验和(checksum),数据读取时,再次计算。如果两者对不上,表示数据损坏了。checksum 是通过 CRC-32C 方式计算得到的。

datanode 负责在收到数据后存储数据及其检验和的比对。用户上传文件或者从其它 datanode 复制数据时会执行该操作。数据及校验和会一起发送到多个 datanode 组成的管线,然后今次进行数据存储。管线中最后一个 datanode 负责验证校验和。如果检测到有问题,会抛出异常。

客户端从 datanode 读取数据时也会进行验证。会和 datanode 中存储的校验和进行比较。如果检测到错误,会向 namenode 报告已损坏的数据块,namenode 会抛出 ChecksumException 异常,并将该数据块标记为损坏,这样该块就不会再被读取到,后面会删除这些块。

每个 datanode 都保存有一个用于验证和校验的日志,记录了每个数据块的最后一次验证时间。客户端成功验证一个数据块后,会告诉 datanode,该日志就会被更新。这些日志内容对检测损坏磁盘很有用

除了数据写入、读取时会有校验,每个 datanode 在后台也会运行一个任务,定期验证存储的所有数据块。

数据压缩

对文件进行压缩可以大幅减少文件大小,减少存储空间同时传输需要的时间也少。Hadoop 可以结合几种常见的压缩文件:

deflate, gzip, bzip2, LZO, LZ4, Snappy

在使用时,每种压缩方式都提供了 9 档压缩比。压缩比越大,速度越慢,压缩文件越小。

gzip 是一个能用的压缩工具,在空间/时间/性能比较平衡。bzip2 压缩能力强于 gzip,但速度慢一点。LZO, LZ4 和 Snappy 优化了压缩速度,其速度比 gzip 快一个数量级,但压缩效率稍差一点。Snappy 和 LZ4 的解压速度比 LZO 高出许多。五种压缩格式中只有 bz2 支持切割。也就是说,它压缩的文件可以被切割成小文件,存储在不同的数据块中。

使用压缩

在 Hadoop 中,对各种压缩方式都有实现。如:GzipCodec, BZip2Codec, SnappyCodec 等。

数据流压缩: CompressionCodec

对数据流压缩用到 CompressionCodec 类。如:

HadoopCompression.java

package filesystem;

import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;

public class HadoopCompression {

    public static void streamCompression() throws Exception {
        DefaultCodec codec = new GzipCodec();
        Configuration conf = new Configuration();
        codec.setConf(conf);

        CompressionOutputStream out = codec.createOutputStream(System.out);
        IOUtils.copyBytes(System.in, out, 4096, false);
        out.finish();
    }

    public static void main(String[] args) throws Exception {
        streamCompression();
    }
}

编译项目:

mvn package

然后在命令行里执行:

echo 'test stream' | hadoop jar target/hadoop-1.0-SNAPSHOT.jar filesystem/HadoopCompression | gunzip

可以看到,我们读取命令行里的输入,然后在用 gzip 进行压缩,然后把结果输出到标准输出,再在命令行里通过 gzip 进行解压。还可以使用其它的压缩方式。

文件压缩: CompressionCodecFactory

先在系统里创建一个压缩文件并上传到 hdfs 里:

gzip test.txt
hadoop fs -mkdir -p /home/hdfs
hadoop fs -put test.txt.gz /home/hdfs/testgzip.gz
hadoop fs -ls /home/hdfs
Found 1 items
-rw-r--r--   1 root supergroup        136 2018-12-04 14:41 /home/hdfs/testgzip.gz

编写JAVA代码如下:

package filesystem;

import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;

import java.net.URI;

public class HadoopCompression {

    public static void fileCompression() throws Exception {
        String uri = "hdfs://localhost:9000/home/hdfs/testgzip.gz";
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);

        Path inputPath = new Path(uri);
        CompressionCodecFactory factory = new CompressionCodecFactory(conf);
        // 自动通过后缀名检测应该使用哪种压缩方式
        // 也可以强制指定某一种压缩方式
        // DefaultCodec codec = new GzipCodec();
        // codec.setConf(conf);
        CompressionCodec codec = factory.getCodec(inputPath);
        if (codec == null) {
            System.err.println("未检测到压缩方式: " + uri);
            System.exit(1);
        }

        String outputUri =
                CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());

        InputStream in = null;
        OutputStream out = null;
        try {
            in = codec.createInputStream(fs.open(inputPath));
            out = fs.create(new Path(outputUri));
            IOUtils.copyBytes(in, out, conf);
        } finally {
            IOUtils.closeStream(in);
            IOUtils.closeStream(out);
        }
    }

    public static void main(String[] args) throws Exception {
        fileCompression();
    }
}

打包并执行:

mvn package
hadoop jar target/hadoop-1.0-SNAPSHOT.jar filesystem/HadoopCompression
18/12/04 14:42:34 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
18/12/04 14:42:34 INFO compress.CodecPool: Got brand-new decompressor [.gz]

hadoop fs -ls /home/hdfs
Found 2 items
-rw-r--r--   1 root supergroup      10240 2018-12-04 14:42 /home/hdfs/testgzip
-rw-r--r--   1 root supergroup        136 2018-12-04 14:41 /home/hdfs/testgzip.gz

可以看到,压缩包被解压了,名称是去掉原压缩文件后缀名后的名字。可以查看文件中的内容:

hadoop fs -cat /home/hdfs/testgzip

原生类库

除了使用 hadoop 包里的各个压缩包外,JAVA本身也带了一些原生的压缩包可以使用。而且,使用原生的包在效率上会有约 10% 的提升。原生方法是指JAVA会调用操作系统中C语言中的方法去实现。不同压缩方式是否有JAVA压缩和原生压缩,是不同的:

压缩格式 是否有JAVA库 是否有原生实现
gzip
bzip2
LZO
LZ4
SnappyLZO

hadoop 会自动搜索原生库并调用。有时候如果想禁用原生代码,可以通过修改配置文件 core-site.xml 中 io.native.lib.available 的值,改为 false 即可。

CodecPool

如果要执行大量的压缩和解压,并且我们想使用原生库。可以使用 CodecPool 压缩池,类似数据库的连接池。它可以节省创建压缩对象时的开销。使用如下:

public static void codecPool() throws Exception {
        Configuration conf = new Configuration();
        DefaultCodec codec = new GzipCodec();
        codec.setConf(conf);

        Compressor compressor = CodecPool.getCompressor(codec);
        CompressionOutputStream out = codec.createOutputStream(System.out, compressor);
        IOUtils.copyBytes(System.in, out, 4096, false);
        out.finish();
        CodecPool.returnCompressor(compressor);
    }

代码从标准输入读取数据,后输出到标准输出。

MapReduce 中压缩

在 MapReduce 任务中使用压缩要复杂一些。因为 HDFS 一个数据块是 128M,对于一些大文件,可能存储在不同的数据块中。比如一个压缩前 1G的文件,它被存储在 8 个块中。如果把这些文件作为 MapReduce 任务的输入数据,可以创建 8 个输入分片,每个分片作为一个单独的 map 任务。

如果文件是用 gzip 压缩的,且压缩后文件大小为 1G。HDFS同样会把它入在 8 个数据块中。但我们无法把每个数据块放在单独的 map 任务中,因为 gzip不支持分片,单独一个数据块是无法成功解压的。所以这 8 个块会在一个 map 任务中执行,显然效率就很低了。

只有 bzip2 压缩支持切分。LZO 支持通过索引实现切分。

MapReduce 任务在读取输入内容时如果发现文件是压缩的,它会自动解压。

// 定义任务各参数
Configuration conf = new Configuration();
// Map 任务输出结果是否压缩
conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true);
// 设置输出结果压缩方式.这里使用 gzip
conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class,
        CompressionCodec.class);

数据序列化

序列化是指将结构化对象转化为字节流,以便在网络传输或写到硬盘上进行永久存储。而将这种数据再转换成对象则被称为反序列化。

序列化在分布式中被用于进程间通信和永久存储。Hadoop 中,系统中多个节点的进程间通信是通过RPC进行的,它实际上是将对象序列化成二进制流发送到远程节点,然后再反序列化为对象进行调用。

Hadoop 中通过实现 Writable 接口可以创建自己的可序列化类。系统也提供许多现成的类,比如基础的几种类型:BooleanWritable, ByteWritable, ShortWritable, IntWritable, VintWritable(变长的 int 型), FloatWritable, LongWritable, VlongWritable(变长的 long 型), DoubleWritable, Text, BytesWritable, NullWritable, ArrayWritable, MapWritable, SortedMapWritable 等.

我们在编写 MapReduce 程序时通常都是用这种 writable 的实现类,但也不是强制的。我们需要的是一种将对象字节之间转换的一种方式。

Writable

类似 IntWritable, TextWritable 都是实现原始的 WritableComparable 接口,它又继承自 Writable 及 Comparable。是一个可比较的。这个很重要,因为在 MapReduce 中有个很重要的步骤,是基于键的排序。

键的排序是由 RawComparator 控制的。它的规则是:

  1. 若属性 mapreduce.job.output.key.comparator.class 已经显式设置了,或者通过 Job 类的 setSortComparatorClass() 的方式设置了,就使用该类的实例。
  2. 如果未设置,且键是 WritableComparable 的子类,就用该类型自己的 comparator。
  3. 如果都不是,就使用 RawComparator。它将字节流反序列化为一个对象,再由 WritableCoparable 的 comPareTo() 方法进行操作。

所以,我们可以自定义一些键的类型,然后自己实现排序方式。这样可以方便业务实现。

另外可以思考一个问题,map 和 reduce 的键都是 Writable 的,那么我们就可以设计一些复杂一点的键,比如复合键,类似数据库的组合主键。

WritableComparable

对于 MapReduce 而言。类型很重要。因为它有一个基于键的排序阶段。所以它的键一定要是 Comparable 的。IntWritable 就是实现了原始的 WritableComparable,而它又继承自Writable 及 Comparable。如果我们要自己实现一个 Writable 类作为键,也要实现 WritableComparable类。

NullWritable

它是 Writable 的特殊类型,它序列化后长度为 0。它不从数据流读取数据,也不写入数据,只是一个占位符。

ObjectWritable, GenericWritable

ObjectWritable 是对 JAVA基本类型 String, enum, Writable, null 或这些类型组成的数组的一个通用的封装。当一个值可能存在有多种类型,ObjectWritable 就很有用。如:一个顺序文件的 key 类型固定,但 value 可能是文本,可能是数字,这样就可以声明为 ObjectWritable。但有一点不好的就是:每次序列化的时候得记录一下它是具体的哪一个实现类。

有一个简便的办法就是使用 GenericWirtable,它内部维护一个静态数组,把该字段可能的类型都保存起来,然后在声明的时候告诉它当前是哪种。这样,序列化的时候就少存很多东西。使用如下:

class MyGenericWritable extends GenericWritable{
 
	//无参构造函数
	public MyGenericWritable() {
		
	}
	
	//有参构造函数
	public MyGenericWritable(Text text) {
		super.set(text);
	}
	
	//有参构造函数.如果当前是存的 Long 型
	public MyGenericWritable(LongWritable longWritable) {
		super.set(longWritable);
	}
 
	
	@Override
	protected Class<? extends Writable>[] getTypes() {
		// 提前定义好可能存在的类型
		return new Class[]{LongWritable.class,Text.class};
	}
	
}

集合类

除了常规的类似 IntWritable, LongWritable 外,系统还提供一些集合类,类似组合键。如:ArrayWritable, MapWritable, 等。除了系统提供的,我们可以自己实现。如:

package mapreduce;

import java.io.*;

import org.apache.hadoop.io.*;

public class IntPair implements WritableComparable<IntPair> {

  private int first;
  private int second;
  
  public IntPair() {
  }
  
  public IntPair(int first, int second) {
    set(first, second);
  }
  
  public void set(int first, int second) {
    this.first = first;
    this.second = second;
  }
  
  public int getFirst() {
    return first;
  }

  public int getSecond() {
    return second;
  }

  public void write(DataOutput out) throws IOException {
    out.writeInt(first);
    out.writeInt(second);
  }

  public void readFields(DataInput in) throws IOException {
    first = in.readInt();
    second = in.readInt();
  }
  
  @Override
  public int hashCode() {
    return first * 163 + second;
  }
  
  @Override
  public boolean equals(Object o) {
    if (o instanceof IntPair) {
      IntPair ip = (IntPair) o;
      return first == ip.first && second == ip.second;
    }
    return false;
  }

  @Override
  public String toString() {
    return first + "\t" + second;
  }
  
  public int compareTo(IntPair ip) {
    int cmp = compare(first, ip.first);
    if (cmp != 0) {
      return cmp;
    }
    return compare(second, ip.second);
  }
  
  /**
   * Convenience method for comparing two ints.
   */
  public static int compare(int a, int b) {
    return (a < b ? -1 : (a == b ? 0 : 1));
  }
  
}

这里实现了一个整数对的类型。相当于两个 int 型的复合主键。当然,我们也要实现自己的 compareTo 方法来实现排序规则。另外还要重写 hashCode(),equals(), toString() 方法。因为分区器通常用 hashCode() 方法来对数据进行分区。通过这个函数要尽量让分到每个 reduce 的数据平均。

####序列化框架 Avro TODO

基于文件的数据结构

有时间在一些特殊的应用中,需要用到一些特殊的数据结构。比如日志分析时,每条记录作为一个键值对数据,将访问时间作为键。由于键和值都要进行持久存储到文件中。所以它们的值都要是可序列化和反序列化的。

SequenceFile

可以把SequenceFile当做是一个容器,把所有的文件打包到SequenceFile类中可以高效的对小文件进行存储和处理。是Hadoop用来存储二进制形式的[Key,Value]对而设计的一种平面文件。

SequenceFile文件并不按照其存储的Key进行排序存储,SequenceFile的内部类Writer提供了append功能。

SequenceFile中的Key和Value可以是任意类型Writable或者是自定义Writable。

在存储结构上,SequenceFile主要由一个Header后跟多条Record组成,Header主要包含了Key classname,value classname,存储压缩算法,用户自定义元数据等信息,此外,还包含了一些同步标识,用于快速定位到记录的边界。

每条Record以键值对的方式进行存储,用来表示它的字符数组可以一次解析成:记录的长度、Key的长度、Key值和value值,并且Value值的结构取决于该记录是否被压缩。

SequenceFile优点:

  1. 支持基于记录(Record)或块(Block)的数据压缩。
  2. 支持splitable,能够作为MapReduce的输入分片。
  3. 修改简单:主要负责修改相应的业务逻辑,而不用考虑具体的存储格式。
package filesystem;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.IOException;
import java.net.URI;

public class SequenceFileOper {

    private static final String[] DATA = {
            "One, two, buckle my shoe",
            "Three, four, shut the door",
            "Five, six, pick up sticks",
            "Seven, eight, lay them straight",
            "Nine, ten, a big fat hen"
    };

    public static void writeFile() throws IOException {
        String uri = "hdfs://localhost:9000/home/hdfs/number.txt";
        Configuration conf = new Configuration();

        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path path = new Path(uri);

        IntWritable key = new IntWritable();
        Text value = new Text();
        SequenceFile.Writer writer = null;

        try {
            writer = SequenceFile.createWriter(fs, conf, path,
                    key.getClass(), value.getClass());

            for (int i = 0; i < 100; i++) {
                key.set(100 - i);
                value.set(DATA[i % DATA.length]);
                System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
                // 每一行的内容是:key, value
                writer.append(key, value);
            }
        } finally {
            IOUtils.closeStream(writer);
        }
    }

    public static void readFile() throws IOException {
        String uri = "hdfs://localhost:9000/home/hdfs/number.txt";
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path path = new Path(uri);

        SequenceFile.Reader reader = null;
        try {
            reader = new SequenceFile.Reader(fs, path, conf);
            // 通过读取行内容确定键和值分别是哪种类型,从而通过反射得到对应的处理类
            Writable key = (Writable)
                    ReflectionUtils.newInstance(reader.getKeyClass(), conf);
            Writable value = (Writable)
                    ReflectionUtils.newInstance(reader.getValueClass(), conf);

            long position = reader.getPosition();
            // 顺序读取
            while (reader.next(key, value)) {
                String syncSeen = reader.syncSeen() ? "*" : "";
                System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
                position = reader.getPosition();
            }
        } finally {
            IOUtils.closeStream(reader);
        }
    }

    public static void main(String[] args) throws IOException {
        readFile();
    }
}

在命令行中可以如下查看文件中的内容:

hadoop fs -text /home/hdfs/number.txt | head -n 10

MapFile

MapFile 是已经排过序的 SequenceFile,它有索引,所以可以按键查找。而索引本身就是一个 SequenceFile,包含了 map 中的一小部分键。

使用起来差不多:

String uri = "hdfs://localhost:9000/home/hdfs/mapnumber.txt";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);

IntWritable key = new IntWritable();
Text value = new Text();
MapFile.Writer writer = null;
try {
  writer = new MapFile.Writer(conf, fs, uri,
      key.getClass(), value.getClass());
  
  for (int i = 0; i < 1024; i++) {
    key.set(i + 1);
    value.set(DATA[i % DATA.length]);
    writer.append(key, value);
  }
} finally {
  IOUtils.closeStream(writer);
}

存储结构

顺序文件由文件头和随后的一条或多条记录组成。文件的前三个字节为 SEQ,其后的一个字节表示文件的版本号。以及其它字段,如:键和值类的名称,数据压缩情况,用户定义的元数据及同步标识。

同步标识用于在读取文件时能够从做生意位置开始识别记录边界。每个文件都有一个随机生成的同步标识,其值存储在文件头中。同步标识位于文件中的记录与记录之间。

记录的内部结构取决于是否启用压缩以及何种压缩。总共有三种情况:不压缩、记录压缩、数据块压缩。

如果不启用压缩,每条记录由记录长度(4字节长的整数)、键长度、键、值组成。

记录压缩和无压缩情况基本相同,不过记录的值是用文件头中定义的 codec 压缩的值。要注意:key 没有被压缩

块压缩是指一次性压缩多条记录。压缩率更高,可以不断向数据块中压缩记录直到块的字节数不小于 io.seqfile.compress.blocksize属性设置的值。该值默认是 1MB。每一个新块的开始处都要插入同步标识。数据块的格式是: 记录字节数、压缩后的键长度、压缩后的键、压缩后的值长度、压缩后的值。

YARN

Yet Another Resource Nagotiator 就是 YARN,它是 Hadoop 集群资源管理系统。提供API给上层应用,然后调用底层的HDFS或HBASE处理。

YARN 启动后会运行两种守护进程。一种是资源管理器,一种是节点管理器(运行在集群所有节点上且能够启动和监控容器)。

容器用于执行特定应用程序的进程,每个容器都有资源限制(内存,CPU等)。一个容器可以是一个 Unix 进程也可以是一个 Linux cgroup。这取决于YARN的配置。

MapReduce

业务配置

Hadoop 项目中一些配置是通过自己的API来进行的。在代码中通过 Configuration conf = new Configuration();来获取。它表示项目中所有配置的一个集合。我们在配置文件 core-site.xml等几个配置文件中进行的配置都可以在代码中获取到。

除了几个安装是要设置的配置文件外,我们还可以添加自己额外的配置文件,放在其它位置。如:

conf-1.xml:

<?xml version="1.0"?>
<configuration>
  <property>
    <name>size</name>
    <value>10</value>
    <description>Size</description>
  </property>
  
  <property>
    <name>weight</name>
    <value>heavy</value>
    <final>true</final>
    <description>Weight</description>
  </property>
  
  <property>
    <name>size-weight</name>
    <value>${size},${weight}</value>
    <description>Size and weight</description>
  </property>
</configuration>

将上面的配置文件放在项目目录里的 src/main/resource/ 目录下。

在JAVA代码中:

Configuration conf = new Configuration();
conf.addResource("conf-1.xml");
String size = conf.get("color");
System.out.printf(size);

打包项目,并通过 hadoop 运行测试就可以看到效果。

如果有多个配置文件配置了同一个值,后 add 进去的值会覆盖前面的。除非在配置文件中将某属性的 final 值设为 true,它就不会被覆盖:

<property>
	<name>weight</name>
	<value>heavy</value>
	<final>true</final>
	<description>Weight</description>
</property>

Hadoop配置

有时候项目开发的时候是在单机环境下进行,测试的时候是在伪分布式环境,而生产环境则是集群环境。不同环境下的配置文件不大相同。为了方便,我们可以配置多套,并在不同时候切换不同配置。

默认情况下,hadoop 是读取的它的安装目录下 etc/hadoop 下的配置文件。如:

hadoop fs -conf conf/hadoop-localhost.xml -ls /

还有一种方法是将不同环境的配置文件放在不同目录,但名称相同。然后通过 HADOOP_CONF_DIR这个环境变量去控制目录。这样就不用每个命令都加 -conf配置。

如:在 hadoop 安装目录下创建 conf 目录,并在里面创建配置文件: hadoop-localhost.xml 内容是:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
	<property>
		<name>fs.defaultFS</name>
		<value>hdfs://localhost:9000</value>
		<description>NameNode</description>
	</property>
	<property>
    		<name>hadoop.tmp.dir</name>
    		<value>/usr/local/hadoop2.9.1/data</value>
  	</property>

	<property>
                <name>mapreduce.framework.name</name>
                <value>yarn</value>
        </property>

	<property>
                <name>yarn.resourcemanager.hostname</name>
                <value>0.0.0.0</value>
        </property>
        <property>
                <name>yarn.nodemanager.aux-services</name>
                <value>mapreduce_shuffle</value>
        </property>

	<property>
        	<name>authname</name>
        	<value>sunyu</value>
        	<description>Size</description>
    	</property>
</configuration>

该配置文件是将之前的三个配置文件做了一个整合,并添加了一个新的配置项 authname 以示区别。

通过 conf.get("authname") 无法获得我们通过 -conf 命令设置的配置文件里的参数。要想获得,需要通过一个辅助类:ToolRunner。

用法如:

package mapreduce;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.*;

import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;

public class HdpConfig extends Configured implements Tool {

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        for (Entry<String, String> entry : conf) {
            System.out.printf("%s=%s\n", entry.getKey(), entry.getValue());
        }
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new HdpConfig(), args);
        System.exit(exitCode);
    }
}

这里打印了所有的配置参数,打包项目并在 hadoop中运行:

hadoop jar target/hadoop-1.0-SNAPSHOT.jar mapreduce/HdpConfig -conf /usr/local/hadoop2.9.1/conf/hadoop-localhost.xml  | grep authname

authname=sunyu

成功获得到。另外,命令行还支持属性的传递,如:

hadoop jar target/hadoop-1.0-SNAPSHOT.jar mapreduce/HdpConfig -conf /usr/local/hadoop2.9.1/conf/hadoop-localhost.xml  -D authname=sun86yu

通过 -D 参数,将我们想设置的参数传递过去。而且这里设置的优先级是高于配置文件中的。所以:我们可以把默认属性放入配置文件中,通过 -D 来设置需要自定义的参数

MRUnit测试

在 MapReduce 中,map 和 reduce 分开进行测试是很非常的。MRUnit 可以帮助我们进行测试,将书籍的输入传递给 mapper 或者检查 reduce 的输出是否符合预期。

这个功能要用到 mrunit 包,所以要在项目的 pom.xml 里加上相应的依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>sunyu</groupId>
    <artifactId>hadoop</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <!-- Component versions are defined here -->
        <hadoop.version>2.9.1</hadoop.version>
        <mrunit.version>1.1.0</mrunit.version>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.9.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.mrunit</groupId>
            <artifactId>mrunit</artifactId>
            <version>${mrunit.version}</version>
            <classifier>hadoop2</classifier>
        </dependency>
    </dependencies>

</project>

temptest/NcdcRecordParser.java

package temptest;

import org.apache.hadoop.io.Text;

// 温度数据解析类
public class NcdcRecordParser {

    // 如果温度是 9999 表示温度数据已经丢失
    private static final int MISSING_TEMPERATURE = 9999;

    private String year;
    private int airTemperature;
    private boolean airTemperatureMalformed;
    private String quality;

    public void parse(String record) {
        year = record.substring(15, 19);
        airTemperatureMalformed = false;
        if (record.charAt(87) == '+') {
            airTemperature = Integer.parseInt(record.substring(88, 92));
        } else if (record.charAt(87) == '-') {
            airTemperature = Integer.parseInt(record.substring(87, 92));
        } else {
            airTemperatureMalformed = true;
        }
        quality = record.substring(92, 93);
    }

    public void parse(Text record) {
        parse(record.toString());
    }

    public boolean isValidTemperature() {
        return !airTemperatureMalformed && airTemperature != MISSING_TEMPERATURE
                && quality.matches("[01459]");
    }

    public boolean isMalformedTemperature() {
        return airTemperatureMalformed;
    }

    public String getYear() {
        return year;
    }

    public int getAirTemperature() {
        return airTemperature;
    }
}

temptest/MaxTempMapper.java

package temptest;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper
        extends Mapper<LongWritable, Text, Text, IntWritable> {

    enum Temperature {
        MALFORMED
    }

    private NcdcRecordParser parser = new NcdcRecordParser();

    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        parser.parse(value);
        if (parser.isValidTemperature()) {
            int airTemperature = parser.getAirTemperature();
            context.write(new Text(parser.getYear()), new IntWritable(airTemperature));
        } else if (parser.isMalformedTemperature()) {
            System.err.println("忽略可能损坏的输入数据: " + value);
            context.getCounter(Temperature.MALFORMED).increment(1);
        }
    }
}

temptest/MaxTemperatureReducer.java

package temptest;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;

import java.io.IOException;

public class MaxTemperatureReducer
        extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context)
            throws IOException, InterruptedException {

        int maxValue = Integer.MIN_VALUE;
        // 当前得到的输入是年份对应的温度列表,需要从中找到当前年份的最大温度
        for (IntWritable value : values) {
            maxValue = Math.max(maxValue, value.get());
        }
        context.write(key, new IntWritable(maxValue));
    }
}

* temptest /HdpTest.java*:

package temptest;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;

import java.util.*;

import org.junit.Test;

import java.io.IOException;

public class HdpTest {

    @Test
    public void testMap() throws IOException, InterruptedException {
        Text value = new Text("0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N" +
                "0000001N9-00781+99999102001ADDGF108991999999999999999999");

        new MapDriver<LongWritable, Text, Text, IntWritable>()
                .withMapper(new MaxTemperatureMapper())
                .withInput(new LongWritable(0), value)
                .withOutput(new Text("1901"), new IntWritable(-78))
                .runTest();
    }

    @Test
    public void testReduce() throws IOException, InterruptedException {
        new ReduceDriver<Text, IntWritable, Text, IntWritable>()
                .withReducer(new MaxTemperatureReducer())
                .withInput(new Text("1901"),
                        Arrays.asList(new IntWritable(10), new IntWritable(5)))
                .withOutput(new Text("1901"), new IntWritable(10))
                .runTest();
    }
}


在 HdpTest.java 上点右键,并 Run,可以看到运行的结果:

Process finished with exit code 0

表示运行成功,如果把代码中预期的温度改成 -74 可以看到结果:

java.lang.AssertionError: 1 Error(s): (Missing expected output (1901, -74) at position 0, got (1901, -78).)

表示预期 -74,但实际解析得到的值是 -78。

本地运行

编写一个任务处理类。可以参照之前 ToolRunner 的做法,方便处理命令行里的参数。

MaxTemperatureDriver.java

package mapreduce;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import temptest.MaxTemperatureReducer;
import temptest.MaxTemperatureMapper;

public class MaxTemperatureDriver extends Configured implements Tool {

    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.printf("使用方式: %s [控制参数] <输入数据路径> <结果输出路径>\n",
                    getClass().getSimpleName());
            ToolRunner.printGenericCommandUsage(System.err);
            return -1;
        }

        Job job = new Job(getConf(), "Max temperature");
        job.setJarByClass(getClass());

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MaxTemperatureMapper.class);
        // 设置 combiner 加速处理.节省带宽
        job.setCombinerClass(MaxTemperatureReducer.class);
        job.setReducerClass(MaxTemperatureReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args);
        System.exit(exitCode);
    }
}

在准备测试数据时,我们已经把 4 年的数据整理出来,并上传到 HDFS 的 /home/hdfs/fouryear.txt.gz。这里可以直接使用这个数据。Hadoop会自动解压已经压缩的输入内容

打包项目文件,并执行项目:

mvn package
hadoop jar target/hadoop-1.0-SNAPSHOT.jar mapreduce/MaxTemperatureDriver /home/hdfs/fouryear.txt.gz /home/hdfs/output/maxtemp4/

运行时会显示 map, reduce 的进度。完成后可以到HDFS里看结果:

[root@localhost hadoop]# hadoop fs -ls -R /home/hdfs/
-rw-r--r--   1 root supergroup     290515 2018-12-06 14:23 /home/hdfs/fouryear.txt.gz
drwxr-xr-x   - root supergroup          0 2018-12-06 14:25 /home/hdfs/output
drwxr-xr-x   - root supergroup          0 2018-12-06 14:25 /home/hdfs/output/maxtemp4
-rw-r--r--   1 root supergroup          0 2018-12-06 14:25 /home/hdfs/output/maxtemp4/_SUCCESS
-rw-r--r--   1 root supergroup         36 2018-12-06 14:25 /home/hdfs/output/maxtemp4/part-r-00000

[root@localhost hadoop]# hadoop fs -text /home/hdfs/output/maxtemp4/part-r-00000
1901	317
1902	244
1903	289
1904	256

可以看到结果。和最早我们用 shell 处理出来的一样。可以尝试多上传几年测试数据试试。

调优

  1. 查看 mapper 运行时长,如果一个 mapper 只运行几秒,可以尝试减少 mapper 数量。
  2. 查看 reducer 数量是否超过 1 个。
  3. map 输出的内容进行压缩可以提升效率。
  4. 充分利用 combiner 减少通过 shuffle 传输的数据量。

集群上带宽是有限的,如果执行大IO任务,要尽量让 map 和 reduce 间传输数据少。一方面可以压缩,另一方面可以通过 combiner 的方式。将 map 输出的结果优化。如:

在找最高温度的示例中,假如有个 map 的输出是 (1901, 0)(1901, 20) (1901, 10)三组数据,第二个 map 输出 (1901, 25)(1901,15)。如果不做处理,这些数据都会被传到 reduce上。但实际上,第一个 map 只需要取最大的值 (1901, 20),第二个 map 只用 (1901, 25) 就可以了。

可以看到这里的逻辑和 Reduce 本身的逻辑是一致的,所以我们可以直接指定 reduce 类为我们的 combiner 类。但诸如求平均值及其它一些业务时可能就要自己重新定义 combiner 类了,它是通过 Reducer 类来定义的,并且可以反复运行多次,不会影响结果

设置 combiner 的方式是:job.setCombinerClass(MaxTemperatureReducer.class);

在处理复杂的业务时,我们要将业务拆分成很多个小的工作来进行。尽量不要让代码中有复杂的业务处理。

比如现在要找到某一天(1月1号)最高气温的平均值。这就要从所有的年份中去查1月1号的记录,找到每年的最高气温,然后再求平均。

工作原理

提交一个 MapReduce 任务,到它执行。总共涉及到五个部分:

  1. 客户端。用来提交任务。
  2. YARN 资源管理器。负责资源调配。如:分配容器,用来运行作业。
  3. YARN 节点管理。负责启动和监视集群中机器上的容器。
  4. Application Master,负责协调运行任务。和作业任务在容器中运行。
  5. HDFS,用来共享作业文件。

整体流程是:

  1. 客户端提交作业。
  2. 资源管理器分配容器
  3. 容器分配 map 和 reduce 任务给节点

客户端–提交作业

客户端提交任务后,每秒会轮询作业的进度。如果发现自上次报告后状态有改变,就会在控制台显示。提交任务的细节:

  1. 向资源管理器请求一个新的应用ID。
  2. 检查作业输出参数。如果没有指定输出目录或者目录已经存在,会提示错误。
  3. 计算作业的输入分片。如果输入参数有误则提示错误。
  4. 将运行作业需要的资源(JAR,配置文件,输入分片等)复制到一个以作业ID全名的目录下的共享文件系统中。供各作业节点使用。
  5. 调用资源管理器的 submitApplication()方法提交作业。

资源管理器–作业初始化

资源管理器接收到提交任务的消息后,将请求传递给 YARN 调度器。调度器分配一个容器,资源管理器通过节点管理器,在容器中启动 application master 进程。

Application master 是一个 JAVA程序,它的主类是 MRAppMaster,它接受来自任务的进度和完成报告。现在,它接受来自共享文件系统里,当前任务的输入分片信息,对每一个分片创建一个 map 任务对象以及多个 reduce 任务对象(个数由代码的 setNumReduceTasks()方法设置)。

Application master要决定如何运行构成当前作业的任务。如果作业很小,就和自己在同一个JVM上运行。如果任务很大,就需要向资源管理器请求新的容器,在新容器中启动任务专用JVM。

小作业是指小于 10 个 mapper,且只有 1 个 reducer ,且输入大小小于一个 HDFS 块的作业。(通过设置 mapreduce.job.ubertask.maxmaps, mapreduce.job.ubertask.maxreduces, mapreduce.job.ubertask.maxbytes 可以改变这几个值)。也可以直接将 mapreduce.job.ubertask.enable 设置为 true 来强行启用 Uber 任务(和Application master在同一JVM运行)

节点管理–任务分配

如果作业初始化时,无法作为 Uber任务,Application master 就要向资源管理器请求新的作业用的容器。资源管理器也是通过节点管理器来管理新容器的,会在容器里启动 Map 或 Reduce 任务。

Map 任务请求的先做级要高于 reduce 任务。因为 map 任务必须在 reduce 的排序阶段能启动前完成。直到有 5% 的 map 任务已经完成时,为 reduce 请求容器的消息才会发出。

reduce 任务能在集群中做生意位置运行,但 map 任务有着数据本地化局限。意思是任务会尽量在数据分片所在的节点上运行。请求也为任务指定了内存需求,CPU需求等。默认时每个 map 任务和 reduce 任务都会分配 1G 内存和一个虚拟的内核。这些值也是可配置的。

节点管理–任务执行

Application master 为任务请求到容器后,会通过节点管理器启动容器。并在容器上运行一个JAVA程序,主类是 YarnChild。在运行任务前会把需要的资源本地化,包括作业的配置、JAR、分布式缓存文件。

YarnChild 在指定的 JVM 中运行。因此用户定义的 map 或 reduce 函数如果有任何缺陷,是不会影响到节点管理器的。

MapReduce 的作业的运行时间可能很长,达到几个小时。作业在运行时需要对运行状态进行跟踪。这个任务是由 Application master 完成的。

对Map 任务而言,任务进度是已处理输入所占的比例;对 reduce 任务,由于过程分三步:复制,shuffle, 排序。

在任务运行时,客户端每秒请求一次 Application master,以接收最新的状态。当最后一个任务已完成后,便把作业状态设置成成功。Application master 和任务容器会清理其工作状态包括中间输出。

任务失败

在实际应用中,经常会碰到各种出错。如:代码出错、进程崩溃、机器故障等。Hadoop 的好处就是它能处理各种故障。我们要关注的是这几种问题:任务失败、Application master 出错、节点管理器出错、资源管理器出错。

任务失败

任务出错通常是用户代码抛出异常,这时JVM会在退出之前向Application master发送错误报告,报告会记入日志。任务退出,节点会释放资源。有时JVM也会出错,这时节点管理器会检测到,并将任务标记为失败。对于超时的任务(通常是 10 分钟,通过 mapreduce.task.timeout 设置),也会将任务票房为失败,并将对应的进程杀死。

Application master 收到任务失败的信息后,将重新调度该任务的执行。并试图避免在失败过的节点管理器上再次调度任务。一个map任务如果失败 4 次,就不会再重试(通过 mapreduce.map.maxattempts 设置)。reduce 任务的重试次数则通过 mapreduce.reduce.maxattempts 设置。默认情况下,map 或 reduce 任意一个重试超过 4 次都会认为整个任务失败。

对有一些任务,我们还可以设置一个通过比。意思就是只要失败的比例不超过多少,就重试。该值通过 mapreduce.map.failures.maxpercent 和 mapreduce.reduce.failures.maxpercent 设置。

Application master出错

Application master 控制任务的启动,数据跟踪。它也可以有几次重试机会。默认是 2,可以通过 yarn.resourcemanager.am.maxattempts设置。重试过程是:

Application master 向资源管理器发送周期性心跳信息,当它出错时,资源管理器将检测到,并在一个新的容器(由节点管理器管理)中开一个新的 Application master 实例。对于运行到一半的任务,则通过作业历史来恢复,这样就不用重头开始再运行了。默认情况下这个功能是开启的,可以通过 yarn.app.mapreduce.am.job.recovery.enable设置为 false 来禁用。

Mapreduce 客户端会每秒向 Application master 轮询任务进度,如果 Application master 出错了,客户端就要定位到新的实例。客户端是在任务初始化的时候知道并缓存 Application master 的地址的,如果 AM 运行一半时出错,客户端发送的轮询就会超时,这时候客户端会向资源管理器请求新的 AM 地址。

节点管理器出错

节点管理器是受资源管理器控制,用来创建容器并执行相应的任务的(运行 AM 或者 Map, Reduce)。如果它由于内部任务或者其它原因崩溃了,就会停止向资源管理器发送心跳信息。如果 10 分钟内(可以通过 yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms设置,以毫秒为单位)没有收到一条心跳信息,资源管理器会把该节点从自己的节点池中移除。在失败节点管理器上运行的任务会在新的节点上运行。

对于在出错节点管理器上运行成功了的 map 任务,如果该任务还没全部完全,那么 AM 会安排它们重新运行。因为 map 任务的中间输出可能还在出错的节点的文件系统中,可能无法被后续的 reduce 访问到。

如果应用程序的运行失败次数过高,那么节点管理器可能会被拉黑。对于 MapReduce,如果一个节点管理器上有超过三个任务失败,AM 就会尽量将任务调度到不同的节点上。可以通过 mapreduce.job.maxtaskfailures.per.tracker自己设定这个数字。

资源管理器出错

资源管理器是最核心的组件,没有它,节点管理器无法启动。默认配置下,资源管理器是单点的,所有运行的作业都失败,而且无法恢复。所以,为了保证高可用,需要对资源管理器做一个双机热备。

出错后,新资源管理器接管系统,它从状态存储区中读取应用程序信息,然后为系统中运行的所有应用程序重新启动 AM。而且这些过程是不记入重试次数的。这种备机切换是由故障转移控制器处理的。

shuffle 和排序

Map - Reduce 执行的过程是 Reduce 接收 Map 的结果作为输入。但不是结果直接就传过去,而是要经过排序。这个过程就是 shuffle。这个步骤通常用来优化 MapReduce 程序。

各个步骤具体的过程很复杂:

Map

map 过程开始产生输出时,会先写到内存的缓冲区中并进行预排序。缓冲区的大小可以自己设置(mapreduce.task.io.sort.mb),缓冲区使用率达到一定值(mapreduce.map.sort.spill.percent,默认是 80%)时,一个后台线程就会把内容溢出(spill)到磁盘上。在此过程中,map 还会持续往缓冲区写入内容。如果在 spill 的过程中缓冲区又满了,map 会阻塞直到写磁盘过程完成。每次缓冲区满,spill 的时候都会生成一个新的文件。所以,map 任务完成后会有很多个这种 spill 文件。

在写入 spill 之前,线程会先根据数据最终要传到的 reducer ,把数据划分成相应的分区。在内存中还要按键排序然后应用 combiner 函数,这样可以使输出更紧凑。

map 运行结束后,spill 文件会合并成一个已经分区,且排好序的输出文件。分区的目的是为了方便传递给对应的 reduce。同时,还可以把输出文件进行压缩(通过 mapreduce.map.output.compress 设置为 true 开启,还可以指定相应的 codec)。

如果 spill 的文件数目不小于 3 个(通过 mapreduce.map.combine.minspills 设置),combiner 就会在输出文件写到磁盘之前再次运行。文件太少时应用 combiner 反而会降低性能。

Reduce

Map 任务的输出文件位于执行 map 任务的 tasktracker 的本地磁盘。但 reduce 任务可能需要不同 map 的输出。所以在执行任务前要先从别处复制需要的输入数据。

map 任务完成后,它们会使用心跳机制告诉他们的 AM。而 AM 知道 map 输出和主机位置之间的对应关系。reducer 中的一个线程会定期询问 AM,以便获取自己需要的数据对应的主机位置。

如果 map 的输出很小,会被复制到 reduce 任务的 JVM 内存。否则会被复制到磁盘上。和 map 输出时的机制一样,它也是判断缓冲区使用率,如果使用率达到一定值,就会往磁盘上 spill。spill 得到的多个文件最后会合并,并且在合并时应用 combiner。在这里,如果 map 输出时进行了压缩,会在内存中进行解压,要不然没法进行合并,排序。

来自多个 map 的数据都复制完成后,开始进入整体的合并及排序。合并采用的是循环方式,比如有 50 个 map 输出,而合并因子(mapreduce.task.io.sort.factor, 默认是 10)是 10,则会合并 5 次,一次 10 个文件。最后得到 5 个中间文件。

在实际合并中并不是每次合并都是执行同样的文件。比如有 40 个文件,可能第一次只合并 4 个文件,随后三次分别合并 10 个,最后一次将目前得到的 4 个文件和剩下的 6 个文件一起合并。总共用了 5 次合并,得到一个总的文件。但这样处理可以减少写到磁盘的数据量。

合并完成后就要执行 reduce逻辑,然后将结果写到输出文件系统,通常是 HDFS。

优化参数

任务执行中,通常是 shuffle 这个过程可以进行一些优化。为它提供更大的内存。常用的一些配置属性如下:

属性 默认值 功能介绍
mapreduce.task.io.sort.mb 100 map 输出时内存缓冲区大小。单位为MB
mapreduce.map.sort.spill.percent 0.8 内存缓冲区使用率达到多少时开始往磁盘上写文件
mapreduce.task.io.sort.factor 10 排序时一次最多合并的文件数,reduce 合并时也用到。
mapreduce.map.combine.minspills 3 map 合并输出时,spill 文件要执行 combine 的最少文件数
mapreduce.map.output.compress false 是否压缩 map 输出
mapreduce.map.output.compress.codec DefaultCodec map 结果输出时使用的压缩方式
mapreduce.shuffle.max.threads 0 节点管理器工作线程数,用于将 map 输出到 reducer。0 表示使用默认值,即 2 倍于可用的处理器数
mapreduce.reduce.shuffle.parallelcopies 5 用于把 map 输出复制到 reducer 的线程数
mapreduce.reduce.shuffle.maxfetchfailures 10 获取 map 输出最大时间,超出则认为获取失败
mapreduce.task.io.sort.factor 10 排序文件时一次合并的数量,同 map 端配置
macreduce.reduce.shuffle.input.buffer.percent 0.7 在 shuffle 复制阶段,分配给 map 输出的缓冲区占堆空间的百分比
mapreduce.reduce.shuffle.merge.percent 0.66 map输出缓冲区使用达到该比例后,启动合并输出到磁盘的过程

MapReduce特性

计数器

MapReduce提供了一些其它的服务,方便我们进行数据分析。计数器就是其中一个。我们可以通过它得知诸如:异常数据条数,总数据条数,任务数等等。我们可以用这些数据来调整业务逻辑,分析数据的方式,调整数据规模等。

MR 内置了许多计数器,我们也可以自己定义。内置的计数器分为几种类型:任务型、文件型、作业型、输入相关、输出相关

任务型计数器

在任务执行过程中,和任务相关的一些计数信息。由任务维护,定期发给 Application master 。常用的有:

计数器 简介 说明
MAP_INPUT_RECORDS map 输入的记录数 作业中所有 map 已处理的输入记录数。当记录传给 map() 函数时,该值递增
MAP_OUTPUT_RECORDS map 输出记录数 作业中所有 map 产生的输出记录数。每次调用 map 的 OutputCollect的 collect()方法时该值递增
MAP_OUTPUT_MATERIALIZED_TYTES map 输出物化字节数 map 输出后写到磁盘上的字节数,若 map 输出开启压缩功能,可在该值上反映出来
COMBINE_INPUT_RECORDS combine 输入的记录数 作业中所有 combine 已处理的输入记录数
COMBINE_OUTPUT_RECORDS combine 输出记录数 作业中所有 combine 产生的输出记录数
REDUCE_INPUT_RECORDS reduce 输出记录数 作业中所有 reduce 已处理的记录数,如果作业完毕,该值应该和 map 输出记录数相等
REDUCE_INPUT_GROUPS reduce 输入的组 作业中所有 reduce 已经处理的不同分组的个数,每当 reduce() 方法被调用,该数递增
REDUCE_OUTPUT_RECORDS reduce 输出记录数 作业中所有 reduce 输出记录数。调用reduce 的 OutputCollect collect() 方法时该数递增
REDUCE_SHUFFLE_BYTES reduce 经过 shuffle 的字节数 由 shuffle 复制到 reducer 的 map 输出的字节数
SPILLED_RECORDS 溢出的记录数 作业中 map 和 reduce 任务溢出到磁盘记录总数
CPU_MILLISECONDS CPU 毫秒 一个任务的总 CPU 时间,以毫秒为单位
PHYSICAL_MEMORY_BYTES 物理内存字节数 一个任务所用物理内存,以字节数为单位
GC_TIME_MILLIS GC 运行时间毫秒数 任务执行过程中垃圾收集器花费的时间
SHUFFLED_MAPS 由 shuffle 传输的 map 输出数 由 shuffle 传输到 reducer 的 map 输出文件数
FAILED_SHUFFLE 失败的 shuffle 数 shuffle 过程中,发生 map 输出拷贝错误的次数
MERGED_MAP_OUTPUTS 被合并的 map 输出数 shuffle 过程中,在 reduce 端合并的 map 输出文件数

文件型计数器

和文件系统相关的计数器。

计数器 简介 说明
BYTES_READ 文件系统读字节数 由 map 和 reduce 在各个文件系统中读取的字节数。文件系统可以是 HDFS, 本地等
BYTES_WRITTEN 文件系统写字节数 同读一样的,写
READ_OPS 文件系统读操作的数量 对文件读操作的数量,包括 open, file status 等
WRITE_OPS 文件系统写操作数量 对文件的写操作籹,包括 create, append 等
LARGE_READ_OPS 大规模读操作数量 在各文件系统老太太规模读操作数。如:对一个很大的目录进行 list 操作

输入型计数器

BYTES_READ, 由 map 任务通过 FileInputFormat 读取的字节数

输出型计数器

BYTES_WRITTEN,由 map 或 reduce 任务通过 FileOutputFormat 写的字节数

作业计数器

作业计数器由 Application Master 维护,因为它负责作业的各个启动。

计数器 说明
TOTAL_LAUNCHED_MAPS 启用的 map 任务数
TOTAL_LAUNCHED_REDUCES 启用的 reduce 任务数
TOTAL_LAUNCHED_UBERTASKS 启用的 mber 任务数
NUM_UBER_SUBMAPS uber 任务中的 map 任务数
NUM_UBER_SUBREDUCES uber 任务中的 reduce 任务数
NUM_FAILED_MAPS 失败的 map 任务数
NUM_FAILED_REDUCES 失败的 reduce 任务数
NUM_FAILED_UBERTASKS 失败的 uber 任务数
NUM_KILLED_MAPS 被中止的 map 任务数
NUM_KILLED_REDUCES 被中止的 reduce 任务数
DATA_LOCAL_MAPS 与输入数据在同一节点上的 map 数
RACK_LOCAL_MAPS 与输入数据在同一机架范围内但不在同一节点上的 map 任务数
OTHER_LOCAL_MAPS 与输入数据不在同一机架范围内的 map 数
MILLIS_MAPS map 任务的总运行时间,单位是毫秒
MILLIS_REDUCES reduce 任务运行的总时间,单位是毫秒

自定义计数器

除了系统级别的计数器,我们还可以定义业务相关的计数器。和单纯的声明变量并递增不同的是,因为任务是分布式运行的,如果直接声明普通变量,无法计算整体的总数。而我们自定义的计数器可以在任务完成时生成一个最终的完整的结果。

package mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import temptest.MaxTemperatureMapper;
import temptest.MaxTemperatureReducer;

public class HdpCounter extends Configured implements Tool {

    // 自定义计数器,包括两个值,用来统计系统中两种状态时的记录数
    enum SelfCounter {
        CNT_1901,
        CNT_1902,
        CNT_OTHER
    }

    static class TestMapperWithCounters
            extends Mapper<LongWritable, Text, Text, IntWritable> {

        private NcdcRecordParser parser = new NcdcRecordParser();

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            parser.parse(value);

            int year = parser.getYearInt();

            if (year == 1901) {
                context.getCounter(SelfCounter.CNT_1901).increment(1);
            } else if (year == 1902) {
                context.getCounter(SelfCounter.CNT_1902).increment(1);
            } else {
                context.getCounter(SelfCounter.CNT_OTHER).increment(1);
            }

            int airTemperature = parser.getAirTemperature();
            context.write(new Text(parser.getYear()),
                    new IntWritable(airTemperature));
        }
    }

    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.printf("使用方式: %s [控制参数] <输入数据路径> <结果输出路径>\n",
                    getClass().getSimpleName());
            ToolRunner.printGenericCommandUsage(System.err);
            return -1;
        }

        Job job = new Job(getConf(), "Max temperature");
        job.setJarByClass(getClass());

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(TestMapperWithCounters.class);
        // 设置 combiner 加速处理.节省带宽
        job.setCombinerClass(MaxTemperatureReducer.class);
        job.setReducerClass(MaxTemperatureReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new HdpCounter(), args);
        System.exit(exitCode);
    }
}


可以看到,代码中声明了一个枚举类型,并在需要的地方递增。任务执行完后,计数器的结果会打印在控制台上,也可以在 WEB 界面查看。还可以通过代码获取。

运行:

mvn package

hadoop jar target/hadoop-1.0-SNAPSHOT.jar mapreduce/HdpCounter /home/hdfs/fouryear.txt.gz /home/hdfs/output/tempcounter/

运行完后,控制台上可能有这样的输出:

Shuffle Errors
	BAD_ID=0
	CONNECTION=0
	IO_ERROR=0
	WRONG_LENGTH=0
	WRONG_MAP=0
	WRONG_REDUCE=0
mapreduce.HdpCounter$SelfCounter
	CNT_1901=6565
	CNT_1902=6565
	CNT_OTHER=13136
mapreduce.HdpCounter$TempStatus
	MISSING=44

可以看到 SelfCounter 里的内容输出出来了。任务结束后,如果还想看计数器内的内容,还可以通过命令行查看:

mapred job -counter job_1544353902681_0001 'mapreduce.HdpCounter$TempStatus' MISSING

18/12/09 12:30:12 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/12/09 12:30:13 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
44

可以看到结果了。

对于非自定义的(任务类、文件类、作业型等),可以先找到对应计数器所属类,然后再确定要查的计数器名。比如要查看 map 输出的记录数,我们知道计数器名是 MAP_INPUT_RECORDS,它属于任务型的,所以它的包是:org.apache.hadoop.mapreduce.TaskCounter。命令行就如下:

mapred job -counter job_1544353902681_0001 'org.apache.hadoop.mapreduce.TaskCounter' MAP_INPUT_RECORDS

任务型、文件型、作业型、输入型、输出型对应的包分别是:

org.apache.hadoop.mapreduce.TaskCounter;
org.apache.hadoop.mapreduce.FileSystemCounter;
org.apache.hadoop.mapreduce.JobCounter;
org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;

该命令需要三个参数,第一个是要查看的任务的ID,第二个是要查看的计数器,第三个是要查看的计数器具体的哪一项。它这里是从历史记录服务器上取的数据,所以要先开启历史服务。

先在配置文件 mapred-site.xml里添加这个属性:

<property>  
        <name>mapreduce.jobhistory.address</name>  
        <value>0.0.0.0:10020</value>  
</property>

然后在命令行里启动历史服务:

/usr/local/hadoop2.9.1/sbin/mr-jobhistory-daemon.sh start historyserver

除了在命令行里查看计数器的值外,还可以在代码里获取。原理差不多,都是去请求历史数据,不过使用的是HADOOP的API:

CounterFields.java

package mapreduce;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;

public class CounterFields extends Configured implements Tool {

    public int run(String[] args) throws Exception {
        if (args.length != 1) {
            JobBuilder.printUsage(this, "<job ID>");
            return -1;
        }
        String jobID = args[0];
        Cluster cluster = new Cluster(getConf());
        Job job = cluster.getJob(JobID.forName(jobID));
        if (job == null) {
            System.err.printf("任务 ID %s 未找到.\n", jobID);
            return -1;
        }
        if (!job.isComplete()) {
            System.err.printf("任务 %s 还未完成.\n", jobID);
            return -1;
        }

        Counters counters = job.getCounters();
        long total = counters.findCounter(HdpCounter.SelfCounter.CNT_1901).getValue();

        System.out.printf("1901年的数据条数: %d\n", total);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new CounterFields(), args);
        System.exit(exitCode);
    }
}

编译并运行:

mvn package

hadoop jar target/hadoop-1.0-SNAPSHOT.jar mapreduce/MissingTemperatureFields job_1544353902681_0001

18/12/09 12:58:42 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/12/09 12:58:43 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
1901年的数据条数: 6565

排序

排序是 MR 技术的核心。

数据准备

如果想将气温字段进行排序,按温度排列。温度是有符号的,所以不能把它作为 Text 对象进行排序。这里可以用 MR 里的顺序文件,把温度作为 key,整条记录作为 value。

首先需要一个程序,把目前的一些测试数据转换成 SequenceFile:

SortData.java:

package mapreduce;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class SortData extends Configured implements Tool {

    static class TransMap
            extends Mapper<LongWritable, Text, IntWritable, Text> {

        private NcdcRecordParser parser = new NcdcRecordParser();

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            parser.parse(value);

            int airTemperature = parser.getAirTemperature();

            context.write(new IntWritable(airTemperature), value);
        }
    }

    public int run(String[] args) throws Exception {
        Job job = new Job(getConf(), "Sort Data");
        job.setJarByClass(getClass());

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(TransMap.class);

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);

        // 不需要 reduce
        job.setNumReduceTasks(0);
        // 输出为顺序文件
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        // 开启压缩
        SequenceFileOutputFormat.setCompressOutput(job, true);
        // Gzip 压缩
        SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        // 块压缩
        SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new SortData(), args);
        System.exit(exitCode);
    }
}

编译并运行:

mvn package

hadoop jar target/hadoop-1.0-SNAPSHOT.jar mapreduce/SortData /home/hdfs/fouryear.txt.gz /home/hdfs/output/sequenfile/

可以查看生成的结果里的文件内容:

hadoop fs -text /home/hdfs/output/sequenfile/part-m-00000 | head -n 5

-78	0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999
-72	0029029070999991901010113004+64333+023450FM-12+000599999V0202901N008219999999N0000001N9-00721+99999102001ADDGF104991999999999999999999
-94	0029029070999991901010120004+64333+023450FM-12+000599999V0209991C000019999999N0000001N9-00941+99999102001ADDGF108991999999999999999999
-61	0029029070999991901010206004+64333+023450FM-12+000599999V0201801N008219999999N0000001N9-00611+99999101831ADDGF108991999999999999999999
-56	0029029070999991901010213004+64333+023450FM-12+000599999V0201801N009819999999N0000001N9-00561+99999101761ADDGF108991999999999999999999

这时候,顺序文件里的值还是未排好序的。现在我们可以使用程序对这里的顺序文件进行排序。

部分排序

MapReduce 会根据输入记录的键对数据集进行排序。

这里正好利用这一特性,我们将输入类型定义为 IntWritable 读取顺序文件,然后原样输出成顺序文件。经过 MapReduce 这一步骤后,顺序文件就会自动排好序。但有一点要注意,由于任务是分布式的,结果经 reduce 任务后会输出多个顺序文件,它们彼此独立。所以,最后生成的是 N 个独立的排好序的顺序文件。N 的值就是我们开启的 reduce 任务的个数。测试:

package mapreduce;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SortSequeneByMR extends Configured implements Tool {

    public int run(String[] args) throws Exception {
        Job job = new Job(getConf(), "Sort Data");
        job.setJarByClass(getClass());

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setInputFormatClass(SequenceFileInputFormat.class);

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);

        // reduce 个数
        job.setNumReduceTasks(10);
        // 输出为顺序文件
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        // 开启压缩
        SequenceFileOutputFormat.setCompressOutput(job, true);
        // Gzip 压缩
        SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        // 块压缩
        SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new SortSequeneByMR(), args);
        System.exit(exitCode);
    }
}

运行后,在输出目录有如下结果:

hadoop fs -ls -R /home/hdfs/output/sortsequen
-rw-r--r--   1 root supergroup          0 2018-12-10 09:01 /home/hdfs/output/sortsequen/_SUCCESS
-rw-r--r--   1 root supergroup      42828 2018-12-10 09:00 /home/hdfs/output/sortsequen/part-r-00000
-rw-r--r--   1 root supergroup      37771 2018-12-10 09:01 /home/hdfs/output/sortsequen/part-r-00001
-rw-r--r--   1 root supergroup      42849 2018-12-10 09:01 /home/hdfs/output/sortsequen/part-r-00002
-rw-r--r--   1 root supergroup      22710 2018-12-10 09:01 /home/hdfs/output/sortsequen/part-r-00003
-rw-r--r--   1 root supergroup      29258 2018-12-10 09:01 /home/hdfs/output/sortsequen/part-r-00004
-rw-r--r--   1 root supergroup      11462 2018-12-10 09:01 /home/hdfs/output/sortsequen/part-r-00005
-rw-r--r--   1 root supergroup      41244 2018-12-10 09:01 /home/hdfs/output/sortsequen/part-r-00006
-rw-r--r--   1 root supergroup      40074 2018-12-10 09:01 /home/hdfs/output/sortsequen/part-r-00007
-rw-r--r--   1 root supergroup      33187 2018-12-10 09:01 /home/hdfs/output/sortsequen/part-r-00008
-rw-r--r--   1 root supergroup      24110 2018-12-10 09:01 /home/hdfs/output/sortsequen/part-r-00009

因为我们开启了 10 个 reducer,所以这里有 10 个结果文件。而且里面的内容是分别排好序的。如果我们这里只开启一个 reducer,就会生成一整个排好序的文件。但就牺牲了 mapredue 分布式的特性了,那就完全没必要用 mapreduce 了,而且在大数据的时候性能会非常低。

全排序

如果要想全排序,只用一个分区肯定不行了。但我们可以进行逻辑上的分区,比如:0-10 交给一个 reduce 处理,11-20 给第二个,依此类推。这样最后将各自生成的文件连接起来就是一个排好序的整体文件了。但这样的问题是:数据分布可能不均匀,可能 0-10 内的数据只有几条,但 21-30 内的数据非常多,甚至占了总数据的 70%。这样就会造成性能分配不均匀。

这种情况下,我们可以先了解数据分布的情况,看在各个区间内分别有多少数据,然后再划分分区。但这样就需要遍历整个数据集,不太现实。可行的办法就是对数据集进行采样分析。Hadoop 内部已经提供了这样的功能。我们直接使用即可:

TotalSortBySample.java

package mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.net.URI;

public class TotalSortBySample extends Configured implements Tool {

    public int run(String[] args) throws Exception {
        Job job = new Job(getConf(), "Sort Data");
        job.setJarByClass(getClass());

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setInputFormatClass(SequenceFileInputFormat.class);

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);

        // reduce
        job.setNumReduceTasks(10);
        // 输出为顺序文件
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        // 开启压缩
        SequenceFileOutputFormat.setCompressOutput(job, true);
        // Gzip 压缩
        SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        // 块压缩
        SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);

        // Hadoop 自动抽样并分区
        // 使用全排序的分区
        job.setPartitionerClass(TotalOrderPartitioner.class);
        // 使用随机抽样进行分区.采样率为 10%,最多采样 10000 条,最多划分 10 个分区
        InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>(0.1, 10000, 10);

        InputSampler.writePartitionFile(job, sampler);

        // 将分区配置添加到分布式缓存
        Configuration conf = job.getConfiguration();
        String partitionFile = TotalOrderPartitioner.getPartitionFile(conf);
        URI partitionUri = new URI(partitionFile);
        job.addCacheFile(partitionUri);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new TotalSortBySample(), args);
        System.exit(exitCode);
    }
}

编译并运行:

mvn package

hadoop jar target/hadoop-1.0-SNAPSHOT.jar mapreduce/TotalSortBySample /home/hdfs/output/sequenfile/part-m-00000 /home/hdfs/output/totalsort

运行完后,结果文件如下:

hadoop fs -ls /home/hdfs/output/totalsort
Found 11 items
-rw-r--r--   1 root supergroup          0 2018-12-10 09:48 /home/hdfs/output/totalsort/_SUCCESS
-rw-r--r--   1 root supergroup      27593 2018-12-10 09:48 /home/hdfs/output/totalsort/part-r-00000
-rw-r--r--   1 root supergroup      34043 2018-12-10 09:48 /home/hdfs/output/totalsort/part-r-00001
-rw-r--r--   1 root supergroup      25292 2018-12-10 09:48 /home/hdfs/output/totalsort/part-r-00002
-rw-r--r--   1 root supergroup      36274 2018-12-10 09:48 /home/hdfs/output/totalsort/part-r-00003
-rw-r--r--   1 root supergroup      28948 2018-12-10 09:48 /home/hdfs/output/totalsort/part-r-00004
-rw-r--r--   1 root supergroup      37523 2018-12-10 09:48 /home/hdfs/output/totalsort/part-r-00005
-rw-r--r--   1 root supergroup      32046 2018-12-10 09:48 /home/hdfs/output/totalsort/part-r-00006
-rw-r--r--   1 root supergroup      33910 2018-12-10 09:48 /home/hdfs/output/totalsort/part-r-00007
-rw-r--r--   1 root supergroup      32300 2018-12-10 09:48 /home/hdfs/output/totalsort/part-r-00008
-rw-r--r--   1 root supergroup      34734 2018-12-10 09:48 /home/hdfs/output/totalsort/part-r-00009

可以尝试看一下文件中的内容,理论上,每一个文件都会比它的上一个文件中的内容key大:

hadoop fs -text /home/hdfs/output/totalsort/part-r-00000 | tail -n 3
18/12/10 09:49:27 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
18/12/10 09:49:27 INFO compress.CodecPool: Got brand-new decompressor [.gz]
18/12/10 09:49:27 INFO compress.CodecPool: Got brand-new decompressor [.gz]
18/12/10 09:49:27 INFO compress.CodecPool: Got brand-new decompressor [.gz]
18/12/10 09:49:27 INFO compress.CodecPool: Got brand-new decompressor [.gz]

-94	0029029720999991903012120004+60450+022267FM-12+001499999V0209991C000019999999N0000001N9-00941+99999102601ADDGF100991999999999999999999
-94	0029029070999991902020620004+64333+023450FM-12+000599999V0201401N006219999999N0000001N9-00941+99999100661ADDGF108991999999999999999999
-94	0029029600999991904120906004+61183+022617FM-12+005199999V0201601N002119999999N0000001N9-00941+99999099311ADDGF108991999999999999999999


hadoop fs -text /home/hdfs/output/totalsort/part-r-00001 | head -n 3
18/12/10 09:52:02 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
18/12/10 09:52:02 INFO compress.CodecPool: Got brand-new decompressor [.gz]
18/12/10 09:52:02 INFO compress.CodecPool: Got brand-new decompressor [.gz]
18/12/10 09:52:02 INFO compress.CodecPool: Got brand-new decompressor [.gz]
18/12/10 09:52:02 INFO compress.CodecPool: Got brand-new decompressor [.gz]
-89	0029029810999991901021213004+59500+020350FM-12+002699999V0200201N008719999999N0000001N9-00891+99999100841ADDGF104991999999999999999999
-89	0029029500999991902122920004+61483+021350FM-12+000699999V0200701N009819999999N0000001N9-00891+99999098421ADDGF108991999999999999999999
-89	0029029500999991902031113004+61483+021350FM-12+000699999V0200501N004119999999N0000001N9-00891+99999100991ADDGF100991999999999999999999

和预想的相符。这里使用的是随机取样,还有许多其它的逻辑。如:IntervalSample,是从一定的间隔中定期选择一些数据。可以应用在一些不同的场景。

辅助排序

Hadoop 中,map 的输入是 (K1, V1),输出是 list(K2, V2)。 reduce 的输入是 (K2, list(V2)), 输出是 list(K3, V3)

可见,记录到达 reduce 时,都是按键排好序的。但键对应的值是一个 list,它里面并没有排好序。比如要查某一年的最高气温。记录到达 reduce 的时候,可能是:(1901, 35), (1901, 21), (1901, 38)。

一个数据到哪个 reduce 这是分区算法决定的,在分区时有个按键排序的过程,这是在 map 中完成的,可以自定义一个排序算法,通过 job.setSortComparatorClass() 设置进去。数据复制到 reducer 端后,会进行数据的合并,同一键对应的值会再进行一次排序,这一组数据的排序是在 reducer 端完成的,可以叫组排序,这里也可以自定义一个排序算法,通过 job.setGroupingComparatorClass() 设置进去。

现在考虑实际中的问题:要查每年的最高气温。这实际可以转换成一个排序的问题,如果把数据按年排好序,然后每年中的温度再排好降序,那就取每年数据里的第一条就行了。在分布式环境中,我们要做到的是一年的数据必须在一个分区里,即在一个 reduce 里处理。要不然各个不同 reduce 只知道自己节点上的最高温度,而不知道全局最高温度。所以这里需要自定义一个分区函数,按年划分区。

数据从 map 到 reduce 的过程中,会按键排序。这里可以直接把键设置成 [年+温度],这样就保证第一条就是最高温度,后面的记录可以直接抛弃,不用处理。

代码如下:

package mapreduce;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class AssistSort extends Configured implements Tool {

    static class MaxTemperatureMapper
            extends Mapper<LongWritable, Text, IntPair, NullWritable> {

        private NcdcRecordParser parser = new NcdcRecordParser();

        @Override
        protected void map(LongWritable key, Text value,
                           Context context) throws IOException, InterruptedException {

            parser.parse(value);
            if (parser.isValidTemperature()) {
                context.write(new IntPair(parser.getYearInt(),
                        parser.getAirTemperature()), NullWritable.get());
            }
        }
    }

    static class MaxTemperatureReducer
            extends Reducer<IntPair, NullWritable, IntPair, NullWritable> {

        @Override
        protected void reduce(IntPair key, Iterable<NullWritable> values,
                              Context context) throws IOException, InterruptedException {

            context.write(key, NullWritable.get());
        }
    }

    public static class FirstPartitioner
            extends Partitioner<IntPair, NullWritable> {

        @Override
        public int getPartition(IntPair key, NullWritable value, int numPartitions) {
            return Math.abs(key.getFirst() * 127) % numPartitions;
        }
    }

    public static class KeyComparator extends WritableComparator {
        protected KeyComparator() {
            super(IntPair.class, true);
        }

        @Override
        public int compare(WritableComparable w1, WritableComparable w2) {
            IntPair ip1 = (IntPair) w1;
            IntPair ip2 = (IntPair) w2;
            int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst());
            if (cmp != 0) {
                return cmp;
            }
            return -IntPair.compare(ip1.getSecond(), ip2.getSecond());
        }
    }

    public static class GroupComparator extends WritableComparator {
        protected GroupComparator() {
            super(IntPair.class, true);
        }

        @Override
        public int compare(WritableComparable w1, WritableComparable w2) {
            IntPair ip1 = (IntPair) w1;
            IntPair ip2 = (IntPair) w2;
            return IntPair.compare(ip1.getFirst(), ip2.getFirst());
        }
    }

    public int run(String[] args) throws Exception {
        Job job = new Job(getConf(), "Assistant Sort Data");
        job.setJarByClass(getClass());

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MaxTemperatureMapper.class);
        job.setPartitionerClass(FirstPartitioner.class);
        job.setSortComparatorClass(KeyComparator.class);
        job.setGroupingComparatorClass(GroupComparator.class);
        job.setReducerClass(MaxTemperatureReducer.class);
        job.setOutputKeyClass(IntPair.class);
        job.setOutputValueClass(NullWritable.class);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new AssistSort(), args);
        System.exit(exitCode);
    }
}

####连接

对于连接查询,最好是用诸如 Pig, Hive, Spark 这类工具。

####边数据分布

##Hadoop集群

##相关项目

###Flume Flume 设计的宗旨是为了向 Hadoop 导入海量数据,如服务器的日志,然后把日志转移到HDFS中。而且,flume 还支持把数据导入到 Hbase, Solr 等目标中。

要使用 Flume 就要使用 Flume 代理。代理是由持续运行的 source(数据来源),sink(数据目标),channel(通道)构成的。

Flume 由一组以分布式结构相互连接的代理构成。系统边缘的代理负责采集数据,并把数据转发给负责汇总的代理,然后再将这些数据存储到最终目的地。代理通过配置来运行一组特定的 source 和 sink。因此,Flume 要做的主要工作就是通过配置使得各个组件连接起来。

数据来源 source 有几种

  1. Syslog Source 。从日志中读取。
  2. Spooling directory Source 。按行读取保存在文件缓冲目录中的文件。
  3. JMS 来自 JSM 队列的内容。
  4. HTTP Source 。监听一个端口,把HTTP请求作为内容。
  5. Exec Source 。运行一个 Unix 命令获取标准输出的内容。如 tail -F /x/xfile
  6. Avro Source。由 Avro sink 或 Flume SDK 通过 Avro RPC 发送的事件

可以在配置文件里进行配置。

传输目的地 sink 有

  1. Kafka Sink 。
  2. Avro Sink 。通过 Avro RPC 发送。
  3. File Roll Sink 。本地文件系统。
  4. HDFS Sink 。HDFS
  5. Elasticsearch
  6. HBase
  7. Solr

也就是说,Flume 可以将接收到的 Event 传输到这几个位置。

Channel 有:

  1. Memory Channel。事件缓存在存储器中,如果此时代理重启,事件会丢失。但吞吐很高。
  2. File Channel. 有持久性,只要事件写入,就会启动代理。

source 产生的事件都会到达 sink,而且至少到达一次。因为有时候事情已经到达,但此时代理重启了。重启后,它还是会把所有未完成的文件重新传递一次。

为提高效率,Flume 在有可能的情况下尽量以事务为单位批量处理事件,而不是逐个处理。因为每个事务只需要写一次本地磁盘,调用一次 fsync。批量处理的大小取决于组件的类型,是可配置的。

这里有示例:

file:///Users/sunyu/ebook/%E4%B8%AA%E4%BA%BA%E6%80%BB%E7%BB%93/%E5%A4%A7%E6%95%B0%E6%8D%AE/storm/storm.html#flume-hdfs-mysql-storm-%E6%B5%81%E9%87%8F%E5%AE%9E%E6%97%B6%E5%88%86%E6%9E%90

file:///Users/sunyu/ebook/%E4%B8%AA%E4%BA%BA%E6%80%BB%E7%BB%93/%E5%A4%A7%E6%95%B0%E6%8D%AE/storm/storm.html#nginx-kafka-hbase-storm-%E5%B9%BF%E5%91%8A%E6%95%B0%E6%8D%AE%E5%88%86%E6%9E%90

分区和拦截

大型数据集通常会分区保存。这样方便查询。在数据到 sink 时,我们可以选择分区方式。如按时间:

# 日志汇总后要存放在HDFS的目录, 这里有日期命名
master.sinks.log-sink-master.hdfs.path = hdfs://localhost:9000/flume/data/%Y-%m-%d

这里就是按每天来分区。其它的方式可能参照 flume 用户指南。


一个事件被写入哪个分区是由事件的 header 中的 timestamp 决定的。默认情况下,事件的 header 中并没有 timestamp,但是它可以通过 Flume 的拦截器来添加。

>拦截器是一种能够对事侏以中事件进行修改或删除的组件,它们连接 source 并在事件被传递到 channel 之前对事件进行处理。

添加拦截器如:

agent1.sources.source1.type = spooldir agent1.sources.source1.spoolDir = /tmp/spooldir agent1.sources.source1.interceptors = interceptor1 agent1.sources.source1.interceptors.interceptor1.type = timestamp

对一些应用程序而言,事件写入HDFS时使用一个时间戳就够了,但有些应用有多层 Flume 代理,那么创建事件和存入的时间可能就有明显的差异,特别是代理出现停机的情况下。这时候我们可以对 HDFS 的 sink 的 ```hdfs.useLocalTimeStap```属性进行设置,以便使用由运行 HDFS 的 Flume 代理产生时间戳。

文件格式
---
一般而言,使用二进制格式来存储数据更好,因为它生成的文件比使用文本格式的文件更小。 HDFS sink 使用的文件格式由 ```hdfs.fileType``` 及其他一些属性的组合控制。

默认的 ```hdfs.fileType``` 文件格式是 ```SequenceFile```,也就是把事件写入一个顺序文件,这个文件中,键就是事件的时间戳,是一个 LongWritable 类型。值的类型是 BytesWritable。如果 ```hdfs.writeFormat```属性被设置为 Text,那么值的类型就是 Text 类型,不是 BytesWritable 了。

同时,还可以为 sink 设置压缩方式。如:

agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path = /tmp/flume agent1.sinks.sink1.hdfs.filePrefix = events agent1.sinks.sink1.hdfs.fileSuffix = .avro

文件类型是纯文本

agent1.sinks.sink1.hdfs.fileType = DataStream

必须设置为 avro_event

agent1.sinks.sink1.serializer = avro_event

压缩方式

agent1.sinks.sink1.serializer.compressionCodec = snappy



扇出
---

可以从一个 source 获取数据,分发到之个 sink ,当然是通过之个 channle。这样就可以实现这种业务:数据存入 Hbase 持久化,同时存入 Solr 提供实时搜索。

多个 sink 配置可能如下:

agent1.sources = source1

配置两个 sink

agent1.sinks = sink1a sink1b

配置两个渠道

agent1.channels = channel1a channel1b

agent1.sources.source1.channels = channel1a channel1b agent1.sources.source1.selector.type = replicating agent1.sources.source1.selector.optional = channel1b

各个 sink 用哪个渠道

agent1.sinks.sink1a.channel = channel1a agent1.sinks.sink1b.channel = channel1b

采用缓冲文件的方式存储

agent1.sources.source1.type = spooldir agent1.sources.source1.spoolDir = /tmp/spooldir

第二种目的地是HDFS

agent1.sinks.sink1a.type = hdfs agent1.sinks.sink1a.hdfs.path = /tmp/flume agent1.sinks.sink1a.hdfs.filePrefix = events agent1.sinks.sink1a.hdfs.fileSuffix = .log agent1.sinks.sink1a.hdfs.fileType = DataStream

第二种目的地

agent1.sinks.sink1b.type = logger

agent1.channels.channel1a.type = file agent1.channels.channel1b.type = memory


复用
---
还有一种用法是在 source 上设置一个复用选择器。这样就可以让一部分事件到一个 sink,一部分到另一个。用法:

http://flume.apache.org/FlumeUserGuide.html#multiplexing-channel-selector


代理层分发
---
通常,我们是让代理接收事件,然后写入 sink。但如果代理有很多,同时写 sink 时,压力会比较大。这时候可以引入一层代理分发层。让代理把数据先传到分发层,分发代理数目远小于节点数。这就要求代理和分发层之前有一个特殊的数据传递。如:

普通代理

agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1

agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1

agent1.sources.source1.type = spooldir agent1.sources.source1.spoolDir = /tmp/spooldir

通过 avro 将事件传递到分发层

agent1.sinks.sink1.type = avro agent1.sinks.sink1.hostname = localhost agent1.sinks.sink1.port = 10000

agent1.channels.channel1.type = file agent1.channels.channel1.checkpointDir=/tmp/agent1/file-channel/checkpoint agent1.channels.channel1.dataDirs=/tmp/agent1/file-channel/data

代理分发

agent2.sources = source2 agent2.sinks = sink2 agent2.channels = channel2

agent2.sources.source2.channels = channel2 agent2.sinks.sink2.channel = channel2

数据来源是 avro,接受代理的事件. host 和 端口和代理层设置的一样

agent2.sources.source2.type = avro agent2.sources.source2.bind = localhost agent2.sources.source2.port = 10000

分发层将事件写入 HDFS

agent2.sinks.sink2.type = hdfs agent2.sinks.sink2.hdfs.path = /tmp/flume agent2.sinks.sink2.hdfs.filePrefix = events agent2.sinks.sink2.hdfs.fileSuffix = .log agent2.sinks.sink2.hdfs.fileType = DataStream

agent2.channels.channel2.type = file agent2.channels.channel2.checkpointDir=/tmp/agent2/file-channel/checkpoint agent2.channels.channel2.dataDirs=/tmp/agent2/file-channel/data


Sink组
---
sink 组可以让多个 sink 当作一个 sink 处理,以实现故障转移和负载均衡。

第一个代理

agent1.sources = source1 agent1.sinks = sink1a sink1b

声明 sink 组

agent1.sinkgroups = sinkgroup1 agent1.channels = channel1

agent1.sources.source1.channels = channel1

定义两个 sink,都使用 channel1

agent1.sinks.sink1a.channel = channel1 agent1.sinks.sink1b.channel = channel1

将两个 sink 都加入 sink 组

agent1.sinkgroups.sinkgroup1.sinks = sink1a sink1b

声明负载均衡.事件将会轮流使用两个 sink

agent1.sinkgroups.sinkgroup1.processor.type = load_balance

声明热备

agent1.sinkgroups.sinkgroup1.processor.backoff = true

agent1.sources.source1.type = spooldir agent1.sources.source1.spoolDir = /tmp/spooldir

引入分发层

agent1.sinks.sink1a.type = avro agent1.sinks.sink1a.hostname = localhost agent1.sinks.sink1a.port = 10000

引入第二个分发层.端口不同

agent1.sinks.sink1b.type = avro agent1.sinks.sink1b.hostname = localhost agent1.sinks.sink1b.port = 10001

agent1.channels.channel1.type = file agent1.channels.channel1.checkpointDir=/tmp/agent1/file-channel/checkpoint agent1.channels.channel1.dataDirs=/tmp/agent1/file-channel/data

第二层分发层

agent2a.sources = source2a agent2a.sinks = sink2a agent2a.channels = channel2a

agent2a.sources.source2a.channels = channel2a agent2a.sinks.sink2a.channel = channel2a

agent2a.sources.source2a.type = avro agent2a.sources.source2a.bind = localhost agent2a.sources.source2a.port = 10000

agent2a.sinks.sink2a.type = hdfs agent2a.sinks.sink2a.hdfs.path = /tmp/flume agent2a.sinks.sink2a.hdfs.filePrefix = events-a agent2a.sinks.sink2a.hdfs.fileSuffix = .log agent2a.sinks.sink2a.hdfs.fileType = DataStream

agent2a.channels.channel2a.type = file agent2a.channels.channel2a.checkpointDir=/tmp/agent2a/file-channel/checkpoint agent2a.channels.channel2a.dataDirs=/tmp/agent2a/file-channel/data

第二个分发代理.端口不同

agent2b.sources = source2b agent2b.sinks = sink2b agent2b.channels = channel2b

agent2b.sources.source2b.channels = channel2b agent2b.sinks.sink2b.channel = channel2b

agent2b.sources.source2b.type = avro agent2b.sources.source2b.bind = localhost agent2b.sources.source2b.port = 10001

agent2b.sinks.sink2b.type = hdfs agent2b.sinks.sink2b.hdfs.path = /tmp/flume agent2b.sinks.sink2b.hdfs.filePrefix = events-b agent2b.sinks.sink2b.hdfs.fileSuffix = .log agent2b.sinks.sink2b.hdfs.fileType = DataStream

agent2b.channels.channel2b.type = file agent2b.channels.channel2b.checkpointDir=/tmp/agent2b/file-channel/checkpoint agent2b.channels.channel2b.dataDirs=/tmp/agent2b/file-channel/data



HBase
===

Hbase 是建立在 HDFS 上的一个面向列的分布式数据库。它和 RDBMS 相似,但它有单元格的版本,每行数据是排序的,而且只要列族预告存在,可以随时添加新列。

HBase 依赖 Zookeeper。用来协调集群。

Hbase 通过在 HDFS 上提供随机读/写来解决 Hadoop 不能处理的问题。Hbase 从底层设计就开始考虑各种可扩展问题。一张表可以很大,数十亿个行,数百万个列;水平分区可以在上千个普通的商用机节点上。这样保证了高效的存储,搜索功能。

RDBMS 的模式比较固定,面向行,且具有 ACID和复杂的SQL查询引擎。强调事务的强一致性。在RDBMS中可以很容易建立二级索引,执行复杂的连接,排序,分组等操作,对或、行、列中的数据进行分页存放。

在小型应用中,诸如 MySQL 这样的 RDMBS 很灵活,方便。但数据规模和并发读写这两方面中任何一个如果要向上大规模的扩展,RDMBS 就会碰到瓶颈。通常会需要我们放松 ACID,让数据、业务管理更复杂。

HBase 有一些特性:

1. 没有真正的索引。行是顺序存储的,每行中的列也是,所以不存在索引膨胀的问题,而且插入性能和表的大小无关。
2. 自动分区。在表增长时,表会自动分裂成区域,并分布到可用的节点上。
3. 线性扩展。当新增节点时,把它指向现有集群并运行,系统会自动平衡,均衡分布。
4. 普通硬件支持。集群中的硬件不需要是很高配置的。普通的商用硬件就行。
5. 容错性。大量节点意味着每个节点的重要性并不突出。不用担心单点的问题。
6. 批处理。MapReduce的功能可以让我们用全并行的分布式作业处理业务。

ZooKeeper
===


分布式应用中,主要的困难是会出现“部分失败”。两个节点间通信时,如果出现网络错误,发送者无法知道接收者是否已经收到。zookeeper 是为了解决这一问题而诞生的,它不能解决避免出现失败,也不会隐藏失败,它提供一组工具,使得出现部分失败时能够妥善的处理。

它的特点有:

1. 设计简单。zookeeper 的核心是一个精简的文件系统,它提供一些简单的操作和一些额外的抽象操作,如排序,通知。
2. 高可用。它可以运行于一组机器之上,并且设计上有高可用性。可以帮助系统避免出现单点故障。
3. 松耦合。在交互的过程中,参与者只需要关注自己的信息,不用管它方。就算自己处理过程结束了,其它进程还可以读取到这条信息。
4. 高性能。它的吞量可以超过每秒 10000 个操作。

安装
---

进入下载页面:http://mirror.bit.edu.cn/apache/zookeeper/ 选择一个稳定版本。

cd /data/soft/ wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz

tar -zxf zookeeper-3.4.12.tar.gz cd zookeeper-3.4.12 cp conf/zoo_sample.cfg conf/zoo.cfg

cd .. mv /data/soft/zookeeper-3.4.12 /usr/local/zookeeper


设置简单的配置文件 conf/zoo.cfg:

***单机模式:***

tickTime=2000 dataDir=/var/lib/zookeeper/ clientPort=2181 initLimit=5 syncLimit=2 server.1=192.168.111.111:2888:3888


启动服务:

/usr/local/zookeeper/bin/zkServer.sh start


启动后可以看到 2181 端口已经被监听。这时候可以做一些简单的测试:

echo ruok | nc localhost 2181 imok


imok 的意思是 zookeeper 服务器返回:i'm ok。还有一些其它命令可以用来管理 zookeeper,都是类似这种四个字母的组合。如:

| 命令 | 描述 |
|-----|-----|
| ruok | 如果服务器正在运行且未处于出错状态,返回 imok |
| conf | 输出服务器的配置信息,根据 zoo.cfg |
| envi | 输出服务器的环境变量,包括版本,JAVA版本及其它信息 |
| srvr | 服务器的统计信息,包括延迟统计,znode 数量,运行模式等 |
| stat | 服务器的统计信息和已连接的客户端 |
| srst | 重置服务器的统计信息 |
| isro | 显示服务器是否处于只读模式或者读写模式 |
| dump | 列出集合体中所有会话和临时 znode。必须连接到 leader 上才能用该命令 |
| cons | 列出所有服务器客户端的连接统计信息 |
| crst | 重置连接统计信息 |
| wchs | 列出服务器上所有监听的摘要信息 |
| wchc | 按连接列出服务器上所有的监听 |
| wchp | 按 znode 路径列出服务器上所有的监听 |
| mntr | 按JAVA属性格式列出服务器统计信息 |

应用
---
使用 zookeeper 最常用的就是在上面创建节点,设置节点的值。以及节点内容编辑,删除。代码可能如下:

***ZkpTest.java***

package Zookeeper;

import java.util.List;

import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher.Event.KeeperState;

import java.io.IOException; import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

public class ZkpTest implements Watcher { private static final int SESSION_TIMEOUT = 5000; private CountDownLatch connectedSignal = new CountDownLatch(1);

/**
 * 创建组
 *
 * @param groupName 组名
 * @throws IOException
 * @throws KeeperException
 * @throws InterruptedException
 */
public void create(String groupName) throws IOException, KeeperException,
        InterruptedException {
    ZooKeeper zk = new ZooKeeper("192.168.111.111", SESSION_TIMEOUT, this);

    String path = "/" + groupName;
    // 创建持久化节点,不是临时的.节点的内容为空,节点访问权限是完全开放
    String createdPath = zk.create(path, null/*data*/, Ids.OPEN_ACL_UNSAFE,
            CreateMode.PERSISTENT);
    System.out.println("创建组 " + createdPath);
}

/**
 * 在组里创新子节点
 *
 * @param groupName
 * @param memberName
 * @throws IOException
 * @throws KeeperException
 * @throws InterruptedException
 */
public void join(String groupName, String memberName) throws IOException, KeeperException,
        InterruptedException {
    ZooKeeper zk = new ZooKeeper("192.168.111.111", SESSION_TIMEOUT, this);

    connectedSignal.await();

    String path = "/" + groupName + "/" + memberName;
    // 创建临时节点
    String createdPath = zk.create(path, null/*data*/, Ids.OPEN_ACL_UNSAFE,
            CreateMode.EPHEMERAL);
    System.out.println("创建节点 " + createdPath);
}

/***
 * 列出组里所有的节点
 * @param groupName
 * @throws IOException
 * @throws KeeperException
 * @throws InterruptedException
 */
public void list(String groupName) throws IOException, KeeperException,
        InterruptedException {
    String path = "/" + groupName;

    try {
        ZooKeeper zk = new ZooKeeper("192.168.111.111", SESSION_TIMEOUT, this);

        List<String> children = zk.getChildren(path, false);
        if (children.isEmpty()) {
            System.out.printf("组里无节点 %s\n", groupName);
            System.exit(1);
        }
        for (String child : children) {
            System.out.println(child);
        }
    } catch (KeeperException.NoNodeException e) {
        System.out.printf("组 %s 不存在\n", groupName);
        System.exit(1);
    }
}

/**
 * 删除组以及组里的子节点
 * @param groupName
 * @throws IOException
 * @throws KeeperException
 * @throws InterruptedException
 */
public void delete(String groupName) throws IOException, KeeperException,
        InterruptedException {
    String path = "/" + groupName;

    try {
        ZooKeeper zk = new ZooKeeper("192.168.111.111", SESSION_TIMEOUT, this);

        List<String> children = zk.getChildren(path, false);
        for (String child : children) {
            zk.delete(path + "/" + child, -1);
            System.out.printf("删除 %s 组里的节点 %s\n", groupName, child);
        }
        zk.delete(path, -1);
    } catch (KeeperException.NoNodeException e) {
        System.out.printf("组 %s 不存在\n", groupName);
        System.exit(1);
    }
}

public void process(WatchedEvent event) {
    if (event.getState() == KeeperState.SyncConnected) {
        connectedSignal.countDown();
    }
}

public static void main(String[] args) throws Exception {
    ZkpTest test = new ZkpTest();
    test.create("test");
    test.list("test");
    test.join("test", "anode");
    test.list("test");
    test.delete("test");
    test.list("test");
} }

详细说明
---

***数据模型***

Zookeeper 维护着一个类似文件系统的树形层次结构。树中的节点叫 znode。它可以用来存储数据,并且有访问权限。这种设计可以用来实现协调服务,而不是大数据的存储,通常一个 znode 存储的数据都被限制在 1MB 以内。

zookeeper 的数据访问具有原子性。客户端在读取一个 znode 的数据时,要么读到所有数据,要么读取失败,不会只读到部分数据。同样,写操作也将替换所有 znode 里的数据,不会部分节点成功写入部分失败。zookeeper 不支持添加操作。

zookeeper 中的路径象文件系统的路径。使用斜杠分隔目录。但 zookeeper 的路径必须是绝对路径,没有相对路径的概念。所有的路径表示必须是规范的,每条路径只有唯一的一种表示方法,不支持类似 /a/./b 这种方式。在文件系统中 . 可以表示当前目录,而 zookeeper 中不支持。

>znode 有两种类型,一种临时的,一种持久的。类型在 znode 创建后就不会再变更。临时节点在客户端会话结束时就会被删除。持久节点只有显式的调用删除功能才会删除。

创建节点时,还可以指定序号标识。这样,znode 名称后就会附加一个序号,该值是一个单调递增的计数器。由它的父节点维护。如,创建一个名为 /test- 的顺序节点,它的名称可能是 /test-1, /test-2 这样。顺序号可以用来为并发时的事件排序,这样客户端就能通过序号推断事件发生的先后。以及实现分布式锁。

我们还可以在节点上添加监听,当节点发生变化时通知监听的客户端。监听是一次性的,事件触发后就消失了,所以通常是在触发后再添加新的监听。

***操作***

zookeeper 中有 9 种操作:

| 操作 | 说明 |
| ----| ---- |
| create | 创建一个 znode,必须要有父节点 |
| delete | 删除一个 znode,它不能有子节点 |
| exists | 检测节点是否存在并查询它的元数据 |
| getACL, setACL | 获取或设置一个节点的访问权限 |
| getChildren | 获取一个节点的子节点列表 |
| getData, setData | 获取或设置一个 znode 保存的数据 |
| sync | 将客户端 znode 数据和 zookeeper 同步 |

zookeeper 中更新数据是有条件的。在使用 delete 或 setData 时,必须提供被更新节点的版本号(可通过 exists 获取)。如果版本号不匹配,更新操作会失败。

zookeeper 中还有一个被称为 multi 的操作,它是将多个基本操作作为一个单元,并保证这些操作同时被成功执行。类似事务。该特性可以用于需要保持全局一致性的数据结构。

create, delete, setData 这些操作上可以触发设置在 exists, getChildren, getData 上的监听功能。权限相关设置不行。

1. 当被监听的 znode 被创建、删除、或数据被更新时,设置在 exists 上的监听会被触发。
2. 当被监听的 znode 被删除或数据被更新时,设置在 getData 操作上的监听被触发。创建节点不会触发。
3. 被监听的 znode 被创建或删除时,或所观察的 znode 自己被删除时,设置在 getChildren 上的观察将被触发。

监听节点事件触发对应关系是:

| 监听操作 | 创建 znode | 创建子节点 | 删除 znode | 删除子节点 | setData |
| ---- | ---- | ---- | ---- | ---- | ---- |
| exists | 触发 | | 触发 | | 触发 |
| getData | | | 触发 | | 触发 |
| getChildren | | 触发 | 触发 | 触发 | |


每个 znode 创建时都会带有一个 ACL 列表,用于决定谁可以对它执行何种操作。ACL 依赖于 zookeeper 客户端的身份验证机制 。有三种身份验证机制:

1. digest, 通过用户名和密码来识别
2. sasl, 通过 Kerberos 识别
3. ip,通过客户端的IP地址来识别

在建立一个会话后,客户端可以对自己进行身份验证。如:

zk.addAuthInfo(“digest”, “tom:123456”.getBytes());

设置权限可以如下:

// 为 ip 10.00.1 添加读的权限 new ACL(Perms.READ, new Id(“ip”, “10.0.0.1”)); ```

exists 操作并不受权限的限制。因此任何客户端都可以使用 exists 检测一个 znode 的状态或者查看是否存在。

ACL权限对应表如下:

ACL 权限 允许的操作
CREATE create 子节点
READ getChildren, getData
WRITE setData
DELETE delete 子节点
ADMIN setACL

底层

zookeeper 可以以单机模式运行,也可以用集群模式来增加高可用性。 在集群中,只要有半数以上的机器正常,整个系统就正常。所以,通常集群里机器的数目是奇数。zookeeper 要保证每个修改都要被复制到集群中超过半数的机器上。要实现这个需求,它使用了 Zab 协议。该协议包括两个可以无限重复的阶段:

  1. 领导者选举。集群中所有机器通过选举,选出一台 master,其它机器作为 slaver。数据变更时,有半数以上的 slaver 数据与 master 同步,则表明该阶段已经完成。
  2. 原子广播。所有的写请求都会被转发给 master,再由它转发给 slaver。当半数以上的 slaver 已经将修改持久化后,master 才会提交这个更新,然后客户端才会收到更新成功的响应。

如果 master 出现故障,其它机器会选出另外一个 master,并和它一起提供服务。如果之前的 master 恢复正常,它会成为一个 slaver。

由于超过半数 slave 的数据更新,这次操作就算完成。那么就可能存在有的 slave 上的数据比 master 上旧。为了应对这种情况,可以在读取数据之前先对要读取的值调用 sync 操作,它会强制该服务器和服务器同步。

理想状态下是所有的客户端都连接和 master 数据一致的节点。当然,master 也是可以被连接的。可以通过设置 leaderServers 的值为 no,让它不接受客户端连接,只负责协调更新。

每一个对 znode 树的更新都会有一个全局唯一的ID。zookeeper 会按编号进行排序,且按顺序执行操作。zookeeper 设计中用如下几种方法保证了数据的一致性:

  1. 顺序一致。操作会按顺序执行。
  2. 原子性。更新操作要么成功要么失败。如果更新失败,不会有客户端看到这个更新的结果。
  3. 单一视图.无论客户端连接的是哪个 ZooKeeper 服务器,其看到的服务端数据模型都是一致的。
  4. 可靠性.一旦服务端成功的应用了一个事务,并完成对客户端的响应,那么该事务所引起的服务端状态变更将会被一直保留下来,除非有另一个事务又对其进行了变更。
  5. 及时性。所有的读操作都是从内存获取数据,读操作不参与编号排序。我们还可以通过 sync 保证数据低延迟。

会话

每个 zookeeper 客户端的配置中都包括整个集群中的服务器列表。启动时,客户端会尝试连接到列表中的一台服务器,如果失败,它会尝试连接另一台,直到成功与服务器建立连接。建立连接后,这台服务器就会为该客户端创建一个新的会话。每个会话都有一个超时时间,该设置是由创建会话的应用设定的。如果服务器在超时时间内没有收到任何请求,则会话过期,连接断开。也就是说会话空闲不能超过过期时间

zookeeper 客户端可以自动进行故障切换,切换到另一台服务器上,而且所有的会话及临时znode都还有效。切换故障的过程中,应用程序将收到断开连接和连接到服务的通知。断开时,监听通知没法发送,但重新连接上后通知还是会发送。在重新连接过程中如果想进行一些操作,这是会失败的。

zookeeper 中,tick time 参数定义了 zookeeper 中的基本时间周期,并被集群中服务器用来定义相互交互的时间。其它设置都是根据 tick time 来定义的,或受它限制。例如,会话超时时间不可以小于 2 个 tick time,并不能大于 20 个 tick time。如果应用端将值设置在这个范围外,系统会自动把它修复到范围内。

通常会把 tick time 设置为 2 秒。也就是说会话超时时间通常是 40 秒。较短的超时设置可以较快的检测到机器故障,但太低的时候容易把网络繁忙的传输延迟误认为是超时。这样就会出现机器的振动现象,就是机器在短时间内反复的故障,恢复。

zookeeper 集群中服务器越多,会话超时时间应该设置的越大。

状态

zookeeper 对象在其生命周期中会经历几种不同的状态。可以通过 getState() 方法来查询对象的状态。状态有:CONNECTING, CONNECTED, CLOSED。zookeeper 实例在一个时刻只能处于一种状态。

如果添加了监听,客户端可以接收到zookeeper 的状态转换通知。当连接到服务时,会进入 CONNECTED 状态,观察者会收到一个 WatchedEvent 通知,其中 KeeperState 的值是 SyncConnected。如果这时候断开重连,状态就会从 CONNECTED 转化为 CONNECTING 然后再到 CONNECTED。断开时,观察者会收到 Disconnected 事件。

如果程序中调用 close() 方法,或出现会话超时,zookeeper 实例会转换到 CLOSED 状态。

应用实例

配置中心

在 zookeeper 上创建 /config 节点,将配置信息设置为值。并且每个应用程序服务器都从 zookeeper 上获取配置信息并监听该节点。如果节点内容有变更,就更新程序端配置。并添加新的监听。

分布式锁

并发的时候,大家同时创建一个临时的有序的 znode,然后按序号先后占有资源。这时要面对一个问题,使用完后要通知后面的操作。可以让各个节点监听它前面的节点。比如某客户端创建的节点是 lock-5, 它可以监听 lock-4,在它消失的时候获得通知。

但这时又会有另一个问题。如果有一个节点创建时出问题了,这时客户端会重新连接,重连后会再次创建一个节点。那先前创建的节点就会成为一个永远无法删除的节点,这样就会死锁,因为后面的请求永远得不到锁。这个问题的根源在于客户端重连后无法判断自己是否已经创建过子节点。为了解决这个问题,可以在节点的名称上加上 sessionID,因为重连后该值是不变的。

比如,创建锁 /buylock/lock-- ,index 的值是父节点 buylock 维护的,递增的序号。重连后先看看有没有名称里有自己 sessionID 的,如果有就不用新建了。


Similar Posts

下一篇 Java编码规范

评论