Linux环境HBase安装配置及使用

3,153 阅读11分钟

Linux环境HBase安装配置及使用

1. 认识HBase

(1) HBase介绍

  • HBase = Hadoop database,Hadoop数据库
  • 开源数据库
  • 官网:hbase.apache.org/
  • HBase源于Google的BigTable
  • Apache HBase™是Hadoop数据库,是一个分布式,可扩展的大数据存储。
  • 当需要对大数据进行随机、实时读/写访问时,请使用Apache HBase™。该项目的目标是托管非常大的表 - 数十亿行X百万列 - 在商品硬件集群上。Apache HBase是一个开源的,分布式的,版本化的非关系数据库nosql,模仿Google的Bigtable: Chang等人的结构化数据分布式存储系统。正如Bigtable利用Google文件系统提供的分布式数据存储一样,Apache HBase在Hadoop和HDFS之上提供类似Bigtable的功能。
  • HBase可执行基于Yarn平台的计算任务,但不擅长。

(2) HBase集群角色

  • HDFS:
    • NameNode——主节点
    • DataNode——数据存储节点
  • Yarn:
    • ResourceManager——全局的资源管理器
    • NodeManager——分节点资源和任务管理器
  • HBase:
    • HMaster
      • 负责Table表和RegionServer的监控管理工作
      • 处理元数据的变更
      • 对HRegionServer进行故障转移
      • 空闲时对数据进行负载均衡处理
      • 管理Region
      • 借助ZooKeeper发布位置到客户端
    • HRegionServer
      • 负责Table数据的实际读写
      • 刷新缓存数据到HDFS
      • 处理Region
      • 可以进行数据压缩
      • 维护Hlog
      • Region分片

(3) Hbase架构

  • HRegionServer结构:
    • HLog:存储HBase的修改记录
    • HRegion:根据rowkey(行键,类似id)分割的表的分片
    • Store:对应HBase表中的一个列族,可存储多个字段
    • HFile:真正的存储文件
    • MemStore:保存当前的操作
    • ZooKeeper:存放数据的元数据信息,负责维护RegionServer中保存的元数据信息
    • DFS Client:存储数据信息到HDFS集群中

2. HBase-1.3.0安装配置流程

(1) HBase准备

  • Hadoop集群环境
  • ZooKeeper集群环境

(2) 解压HBase-1.3.0相关安装包到目标目录下:

  • tar -zxvf .tar.gz -C 目标目录

(3) 修改配置文件:

  • 进入hbase/conf路径:
    • vi hbase-env.sh
    •  # The java implementation to use.  Java 1.7+ required.
       export JAVA_HOME=jdk安装路径
      
       # 注释掉以下语句(jdk1.8中不需要这个配置)
       # export HBASE_MASTER_OPTS="$HBASE_MASTER_OPTS -XX:PermSize=128m -XX:MaxPermSize=128m"
       # export HBASE_REGIONSERVER_OPTS="$HBASE_REGIONSERVER_OPTS -XX:PermSize=128m -XX:MaxPermSize=128m"
      
       # Tell HBase whether it should manage it's own instance of Zookeeper or not.
       # 关闭HBase自带的ZooKeeper
       export HBASE_MANAGES_ZK=false
      

  • vi hbase-site.xml
  •  <!-- 设置namenode所在位置(HDFS中存放的路径) -->
       <property>
           <name>hbase.rootdir</name>
           <value>hdfs://bigdata01:9000/hbase</value>    
       </property>
       
       <!-- 是否开启集群 -->
       <property>
           <name>hbase.cluster.distributed</name>
           <value>true</value>
       </property>
       
       <!-- HBase-0.9.8之前默认端口为60000 -->
       <property>
           <name>hbase.master.port</name>
           <value>16000</value>
       </property>
       
       <!-- zookeeper集群的位置 -->
       <property>
           <name>hbase.zookeeper.quorum</name>
           <!-- 注意不要有空格 -->   <value>bigdata01:2181,bigdata02:2181,bigdata03:2181</value>
       </property>
       
       <!-- hbase的元数据信息存储在zookeeper的位置 -->
       <property>
           <name>hbase.zookeeper.property.dataDir</name>
           <value>/XXX/zookeeper-3.4.10/zkData</value>
       </property>
    

  • vi regionservers
  • 加入从节点主机名

(4) 解决依赖问题

  • 进入hbase/lib路径:
    • rm -rf hadoop-*
    • rm -rf zookeeper-*
    • 把相关版本的zookeeper和hadoop的依赖包导入到hbase/lib

(5) 软连接hadoop配置

  • ln -s /XXX/hadoop/core-site.xml
  • ln -s /XXX/hadoop/hdfs-site.xml

(6) 拷贝配置好的HBase到其他机器上

  • scp -r hbase-1.3.0/ bigdata02:$PWD
  • scp -r hbase-1.3.0/ bigdata03:$PWD

(7) 配置环境变量:

  • 修改配置文件:
    • vi /etc/profile
  • 增加以下内容:
    • export HBASE_HOME=/opt/module/hbase-1.3.0
    • export PATH=$PATH:$HBASE_HOME/bin
  • 声明环境变量:
    • source /etc/profile

(8) 启动集群

  • 启动主节点
    • hbase-daemon.sh start master
  • 启动从节点
    • hbase-daemon.sh start regionserver

(9) 关闭集群

  • 关闭主节点
    • hbase-daemon.sh stop master
  • 关闭从节点
    • hbase-daemon.sh stop regionserver

(10) UI界面

3. HBase Shell操作

(1) 启动终端

  • hbase shell
  • 回退:ctrl + <-

(2) 操作命令:

  • 注意:
    • 若报错:ERROR: Can't get master address from ZooKeeper; znode data == null
    • 解决方案:虚拟机挂起,会使zookeeper不稳定,进而造成hbase不稳定,若遇到报错,重启HBase集群即可。
  • 查询表:
    • list
  • 显示HBase服务器状态:
    • status '主机名'
    • 1 active master:存活的主节点
    • 0 backup masters:备份的主节点
    • 3 servers:从节点
    • 0 dead:宕机
    • 0.6667 average load:平均加载时间
  • 显示当前用户:
    • whoami
  • 创建表:
    • create '表名', '列族1', '列族2'
  • 查看表:
    • 全表扫描:
      • scan '表名'
    • 指定Rowkey扫描:
      • scan '表名', {STARTROW => 'Rowkey值', STOPROW => 'Rowkey值'}
      • STOPROW为可选指令,值为实际查看Rowkey+1
    • 查看表结构:
      • describe '表名'
      • 修改表结构信息:
        • alter '表名', {NAME => '列族名', 变更字段名 => ' '}
    • 查询指定数据信息:
      • 指定具体的rowkey:
        • get '表名', 'rowkey'
      • 指定具体的列:
        • get '表名', 'rowkey', '列族:列名'
      • 统计表行数:
        • count '表名'
        • 根据Rowkey进行统计
  • 表中添加数据信息:
    • put '表名', 'rowkey', '列族:列名', '值'
    • HBase只有覆盖没有修改
    • 覆盖时对应表名、rowkey、列族、列名字段,输入新的值信息
  • 清空表:
    • truncate '表名'
  • 删除表:
    • 指定表不可用:
      • disable '表名'
    • 删除:
      • drop '表名'

(3) 退出终端

  • exit或者quit

4. HBase读写数据

(1) HBase读数据流程

  • 概述:

    • 客户端Client访问ZooKeeper,返回-ROOT-表元数据位置,根据元数据位置去查找对应的RegionServer,同时根据-ROOT-查找到.META表,再根据.META表的元数据查找到Region,返回Region的业务元数据给客户端。

  • 具体流程:

    • 客户端Client从Region中的Store中读取数据,若在写数据缓存memstore(存放用户最近写入的数据)中读到对应数据,则直接返回数据信息到客户端,若memstore中不存在对应数据,则去读数据缓存blockcache中查找,若blockcache中仍未找到,则去对应的HFile查找数据信息并存入blockcache,进而通过blockcache返回数据信息到客户端。HBase读写分离

(2) HBase写数据流程

  • 客户端Client发送写数据请求,通过ZooKeeper获取到表的元数据信息,客户端通过RPC通信查找到对应的RegionServer,进而找到Region,同时在HLog中记录写操作,通过HLog把数据信息写入到memstore(16KB),memstore存满后溢写到storeFile中,最后HDFS客户端统一存储到HFile。

5. HBase API操作

(1) 准备工作

  • 新建Maven工程,pom.xml中添加依赖:
    • <dependencies>
          <dependency>
              <groupId>org.apache.hbase</groupId>
              <artifactId>hbase-server</artifactId>
              <version>1.3.0</version>
          </dependency>
          <dependency>
              <groupId>org.apache.hbase</groupId>
              <artifactId>hbase-client</artifactId>
              <version>1.3.0</version>
          </dependency>
      </dependencies>
      
  • 从HBase下的conf/目录中导出core-site.xml,hbase-site.xml,hdfs-site.xml,导入Maven工程的resources目录下。
    • 注意:不进行该操作会导致与HBase连接失败

(2) 具体操作

  • package ybcarry.hbase;
    
    
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.hbase.*;
      import org.apache.hadoop.hbase.client.*;
      import org.apache.hadoop.hbase.util.Bytes;
      
      import java.io.IOException;
      import java.util.ArrayList;
      import java.util.List;
      
      /**
       * @ClassName: HbaseTest
       * @Description
       * @Author: YBCarry
       * @Date2019-03-30 21:10
       * @Version: V1.0
       **/
      
      public class HbaseTest {
      
          public static Configuration conf ;
      
          //配置信息
          static {
              conf = HBaseConfiguration.create() ;
          }
      
          /**1. 判断HBase中表是否存在*/
          public static boolean isExist(String tableNane) throws IOException {
      
              //旧版本操作表
              //HBaseAdmin admin = new HBaseAdmin(conf);
      
              //加载配置
              Connection connection = ConnectionFactory.createConnection(conf) ;
      
              //管理器
              HBaseAdmin admin = (HBaseAdmin)connection.getAdmin() ;
      
              return admin.tableExists(TableName.valueOf(tableNane)) ;
          }
      
          /**2. 在HBase中创建表*/
          public static void createTable(String tableName, String... columnFamilly) throws IOException {
      
              //加载配置
              Connection connection = ConnectionFactory.createConnection(conf) ;
      
              //a. 如果对表的操作需要使用管理器
              HBaseAdmin admin = (HBaseAdmin)connection.getAdmin() ;
      
              //b. 创建描述器
              HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)) ;
      
              //c. 指定多个列族
              for (String cf : columnFamilly) {
                  htd.addFamily(new HColumnDescriptor(cf)) ;
              }
      
              //d. 创建表
              admin.createTable(htd) ;
              System.out.println("createTable----successful") ;
          }
      
          /**3. 向表中添加数据*/
          public static void addData(String tableName, String rowkey, String cf, String column, String value) throws IOException {
      
              //加载配置
              Connection connection = ConnectionFactory.createConnection(conf) ;
      
              //a. 指定表
              Table table = connection.getTable(TableName.valueOf(tableName)) ;
      
              //b. 添加数据 put方式
              Put put = new Put(Bytes.toBytes(rowkey)) ;
      
              //c. 指定添加的列族 列 值
              put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value)) ;
      
              //d. 写入表中
              table.put(put) ;
              System.out.println("addData----successful") ;
      
          }
      
          /**4. 删除一个rowkey*/
          public static void deleteRow(String tableName, String rowkey) throws IOException {
      
              //加载配置
              Connection connection = ConnectionFactory.createConnection(conf) ;
      
              //a. 指定表
              Table table = connection.getTable(TableName.valueOf(tableName)) ;
      
              //b. 删除rowkey delete方式
              Delete delete = new Delete(Bytes.toBytes(rowkey)) ;
      
              //d. 执行操作
              table.delete(delete) ;
              System.out.println("deleteRow----successful") ;
          }
      
          /**5. 删除多个rowkey*/
          public static void deleteRows(String tableName, String... rowkey) throws IOException {
      
              //加载配置
              Connection connection = ConnectionFactory.createConnection(conf) ;
      
              //a. 指定表
              Table table = connection.getTable(TableName.valueOf(tableName)) ;
      
              //b. 封装delete操作
              List<Delete> ds = new ArrayList<Delete>() ;
      
              //c. 遍历rowkey
              for (String rk : rowkey) {
                  Delete deletes = new Delete(Bytes.toBytes(rk)) ;
                  ds.add(deletes) ;
              }
      
              //d. 执行操作
              table.delete(ds) ;
              System.out.println("deleteRows----successful") ;
          }
      
          /**6. 全表扫描*/
          public static void scanAll(String tableName) throws IOException {
      
              //加载配置
              Connection connection = ConnectionFactory.createConnection(conf) ;
      
              //a. 指定表
              Table table = connection.getTable(TableName.valueOf(tableName)) ;
      
              //b. 扫描操作
              Scan scan = new Scan() ;
      
              //c. 获取返回值
              ResultScanner rs = table.getScanner(scan) ;
      
              //d. 打印扫描信息
              for (Result r : rs) {
      
                  //单元格
                  Cell[] cells = r.rawCells() ;
      
                  for (Cell cs : cells) {
                      System.out.println("RowKey:" + Bytes.toString(CellUtil.cloneRow(cs))) ;
                      System.out.println("ColumnFamilly:" + Bytes.toString(CellUtil.cloneFamily(cs))) ;
                      System.out.println("Column:" + Bytes.toString((CellUtil.cloneQualifier(cs)))) ;
                      System.out.println("Value:" + Bytes.toString((CellUtil.cloneValue(cs)))) ;
                  }
              }
      
              //成功标志
              System.out.println("scanAll----successful") ;
          }
      
          /**7. 删除表*/
          public static void deleteTable(String tableName) throws IOException {
      
              //加载配置
              Connection connection = ConnectionFactory.createConnection(conf) ;
      
              //a. 如果对表的操作需要使用管理器
              HBaseAdmin admin = (HBaseAdmin)connection.getAdmin() ;
      
              //b. 弃用表
              admin.disableTable(tableName) ;
      
              //c. 删除表
              admin.deleteTable(TableName.valueOf(tableName)) ;
      
              //成功标志
              System.out.println("deleteTable----successful") ;
          }
      
      
          /**主函数*/
          public static void main(String[] args) throws IOException {
      
              /**1. 判断HBase中表是否存在*/
      //        System.out.println(isExist("user")) ;
      
              /**2. 在HBase中创建表*/
      //        createTable("create1", "info1", "info2", "info3") ;
      
              /**3. 向表中添加数据:列族 列 值*/
      //        addData("create1", "xiaoming", "info1", "age", "18") ;
      //        addData("create1", "xiaoming", "info1", "sex", "man") ;
      //        addData("create1", "xiaoming", "info2", "professional", "student") ;
      //        addData("create1", "xiaohong", "info2", "professional", "teacher") ;
      
              /**4. 删除一个rowkey*/
      //        deleteRow("create", "xiaoming") ;
      
              /**5. 删除多个rowkey*/
      //        deleteRows("create", "xiaoming", "xiaohong") ;
      
              /**6. 全表扫描*/
      //        scanAll("create1") ;
      
              /**7. 删除表*/
              deleteTable("create2") ;
          }
      }
    

6. HBase MR操作

(1) 说明

  • HBase主要擅长的领域是存储数据,不擅长分析数据
  • HBase如果想计算的话需要结合Hadoop的mapreduce
  • hbase-mr所需的jar包查看:
    • bin/hbase mapredcp

(2) 准备工作

  • 方式1:配置临时环境变量
    • export HBASE_HOME=/XXXX/hbase-1.3.0
    • export HADOOP_HOME=/XXXX/hadoop2.8.4
    • export HADOOP_CLASSPATH_2=`$HADOOP_CLASSPATH_2:$HBASE_HOME/bin/hbase mapredcp` 
      
  • 方式2:配置系统环境变量
    • 修改配置文件:
      • vi /etc/profile
    • 增加以下内容:
      • export HBASE_HOME=/opt/module/hbase-1.3.0
      • export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
        
  • 声明环境变量:
    • source /etc/profile

(3) 运行HBase-MR程序

  • bin/yarn jar /XXXX/hbase-1.3.0/lib/hbase-server-1.3.0.jar row counter 表名

(4)实例1:HBase过滤列族和列

  • ReadMapper.java:

  • package ybcarry.mr;
    
      import org.apache.hadoop.hbase.Cell;
      import org.apache.hadoop.hbase.CellUtil;
      import org.apache.hadoop.hbase.client.Put;
      import org.apache.hadoop.hbase.client.Result;
      import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
      import org.apache.hadoop.hbase.mapreduce.TableMapper;
      import org.apache.hadoop.hbase.util.Bytes;
      
      import java.io.IOException;
      
      /**
       * @ClassName: ReadMapper
       * @Description
       * @Author: YBCarry
       * @Date2019-04-26 01:36
       * @Version: V1.0
       *
       * mapper类进行对数据
       * K:ImmutableBytesWritable HBase中的rowkey
       * V:Put 封装的数据
       **/
      public class ReadMapper extends TableMapper<ImmutableBytesWritable, Put> {
      
          @Override
          protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
      
              //1. 读取数据  根据rowkey拿到数据
              Put put = new Put(key.get()) ;
      
              //2. 过滤列
              for (Cell c : value.rawCells()) {
      
                  //得到info列族数据——是info列族-取出;不是info列族-过滤
                  if ("info".equals(Bytes.toString(CellUtil.cloneFamily(c)))) {
      
                      //过滤列
                      if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(c)))) {
                          put.add(c) ;
                      }
                  }
              }
      
              //3. 输出到reducer端:key -> rowkey  put -> 具体数据
              context.write(key, put) ;
          }
      }
    



  • WriteReducer.java:

  • package ybcarry.mr;
    
      import org.apache.hadoop.hbase.client.Put;
      import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
      import org.apache.hadoop.hbase.mapreduce.TableReducer;
      import org.apache.hadoop.io.NullWritable;
      
      import java.io.IOException;
      
      /**
       * @ClassName: WriteReducer
       * @Description
       * @Author: YBCarry
       * @Date2019-04-26 02:55
       * @Version: V1.0
       **/
      public class WriteReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
      
          @Override
          protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
      
              for (Put p : values) {
                  context.write(NullWritable.get(), p) ;
              }
          }
      }
    



  • Driver.java:

  • package ybcarry.mr;
    
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.hbase.HBaseConfiguration;
      import org.apache.hadoop.hbase.client.Put;
      import org.apache.hadoop.hbase.client.Scan;
      import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
      import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.util.Tool;
      import org.apache.hadoop.util.ToolRunner;
      
      /**
       * @ClassName: Driver
       * @Description
       * @Author: YBCarry
       * @Date2019-04-26 03:03
       * @Version: V1.0
       **/
      public class Driver implements Tool {
      
          private Configuration conf ;
      
          public void setConf(Configuration configuration) {
      
              this.conf = HBaseConfiguration.create(configuration) ;
          }
      
          public Configuration getConf() {
              return this.conf ;
          }
      
          public int run(String[] strings) throws Exception {
      
              //1. 创建任务
              Job job = Job.getInstance(conf) ;
      
              //2. 指定运行的主类
              job.setJarByClass(Driver.class) ;
      
              //3. 配置job
              Scan scan = new Scan() ;
      
              //4. 设置具体运行mapper类(table1为HBase中要处理的表的表名)
              TableMapReduceUtil.initTableMapperJob("table1",
                      scan,
                      ReadMapper.class,
                      ImmutableBytesWritable.class,
                      Put.class,
                      job
                      ) ;
      
              //5. 设置具体运行reducer类(table2为HBase中处理后生成表的表名,需要在HBase中提前创建好)
              TableMapReduceUtil.initTableReducerJob("table2",
                      WriteReducer.class,
                      job
                      );
      
              //6. 设置reduceTask
              job.setNumReduceTasks(1) ;
      
              boolean rs = job.waitForCompletion(true) ;
              return rs ? 0 : 1 ;
          }
      
          public static void main(String[] args) throws Exception {
      
              //状态码
              int sts = ToolRunner.run(new Driver(), args) ;
      
              //退出
              System.exit(sts) ;
          }
      }
    

  • 生成jar包,导入HBase环境中即可

(5)实例2:HBase过滤列族和列

  • ReadHDFSMapper.java:

  •   package ybcarry.mr2;
    
      import org.apache.hadoop.hbase.client.Put;
      import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
      import org.apache.hadoop.hbase.util.Bytes;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Mapper;
      
      import java.io.IOException;
      
      /**
       * @ClassName: ReadHDFSMapper
       * @Description
       * @Author: YBCarry
       * @Date2019-04-27 00:31
       * @Version: V1.0
       *
       * 读取HDFS中的数据:
       * 输入通过Hadoop MR
       * 输出通过HBase MR
       **/
      public class ReadHDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
      
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      
              //1. 读取数据
              String line = value.toString() ;
      
              //2. 切分数据
              String[] fields = line.split("\t") ;
      
              //3. 封装数据
              //数据示例:001	YBCarry	21
              byte[] rowkey = Bytes.toBytes(fields[0]) ;
              byte[] name = Bytes.toBytes(fields[1]) ;
              byte[] age = Bytes.toBytes(fields[2]) ;
      
              //4. 封装成put
              Put put = new Put(rowkey) ;
              put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), name) ;
              put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), age) ;
      
              //5. 输出到reducer端
              context.write(new ImmutableBytesWritable(rowkey), put) ;
          }
      }
    



  • WriteHBaseReducer.java:

  •   package ybcarry.mr2;
    
      import org.apache.hadoop.hbase.client.Put;
      import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
      import org.apache.hadoop.hbase.mapreduce.TableReducer;
      import org.apache.hadoop.io.NullWritable;
      
      import java.io.IOException;
      
      /**
       * @ClassName: WriteHBaseReducer
       * @Description
       * @Author: YBCarry
       * @Date2019-04-27 02:29
       * @Version: V1.0
       *
       * hbase
       **/
      public class WriteHBaseReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
      
          @Override
          protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
      
              for (Put p : values) {
                  //rowkey不需要,所以NullWritable
                  context.write(NullWritable.get(), p) ;
              }
          }
      }
    



  • Driver.java:

  •   package ybcarry.mr2;
    
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.hbase.HBaseConfiguration;
      import org.apache.hadoop.hbase.client.Put;
      import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
      import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.util.Tool;
      import org.apache.hadoop.util.ToolRunner;
      
      /**
       * @ClassName: Driver
       * @Description
       * @Author: YBCarry
       * @Date2019-04-27 03:00
       * @Version: V1.0
       **/
      public class Driver implements Tool {
      
          //配置初始化
          private Configuration conf ;
      
          public void setConf(Configuration configuration) {
              this.conf = HBaseConfiguration.create(configuration) ;
          }
      
          public Configuration getConf() {
              return this.conf ;
          }
      
          public int run(String[] strings) throws Exception {
              //1. 创建任务
              Job job = Job.getInstance(conf) ;
      
              //2. 指定运行的主类
              job.setJarByClass(Driver.class) ;
      
              //3. 配置Mapper
              job.setMapperClass(ReadHDFSMapper.class) ;
              job.setMapOutputKeyClass(ImmutableBytesWritable.class) ;
              job.setMapOutputValueClass(Put.class) ;
      
              //4. 配置Reducer (表名table2为输出表的表名)
              TableMapReduceUtil.initTableReducerJob("table2", WriteHBaseReducer.class, job) ;
      
      
              //5. 输入配置 inputFormat (able1为初始表的路径)
              FileInputFormat.addInputPath(job, new Path("/table1")) ;
      
      
              //6. 设置reduceTask
              job.setNumReduceTasks(1) ;
      
              boolean rs = job.waitForCompletion(true) ;
              return rs ? 0 : 1 ;
          }
      
          public static void main(String[] args) throws Exception {
      
              //状态码
              int sts = ToolRunner.run(new Driver(), args) ;
      
              //退出
              System.exit(sts) ;
          }
      }
    

  • 生成jar包,导入HBase环境中即可

7. HBase 优化

(1) 预分区

  • 表很大、数据量大 —— region分片+分布式
  • region:
    • 每一个region维护着startRow与endRowKey,如果加入的数据符合某个region维护的rowKey范围,则该数据交给这个region维护。那么依照这个原则,我们可以将数据所要投放的分区提前大致的规划好,以提高HBase性能。rowkey{startrowkey,endrowkey}
  • 预分区设置:
    • 方法1:create 'table1','info','partition',SPLITS => ['101','102','103','104']
    • 显示效果:
    • 方法2:create 'table2','partition',SPLITS_FILE => '/XXXX/partitions.txt'(文件放在hbase-shell路径下或者使用绝对路径)
    • partitions.txt内容:
      • aa
        bb
        cc
        dd
        ee
        
    • 显示效果:

(2) rowkey设计

  • 一条数据的唯一标识就是rowkey,那么这条数据存储于哪个分区,取决于rowkey处于哪个一个预分区的区间内,设计rowkey的主要目的 ,就是让数据均匀的分布于所有的region中,在一定程度上防止数据倾斜。
  • 解决方案:
    • 目标:打散数据,平均分配到每个region中
    • 方案1——生成随机数、hash、散列值
      • 一般会选择从数据集中抽取样本,来决定什么样的rowKey来Hash后作为每个分区的临界值。
    • 方案2——字符串拼接:
      • 20190427_a3d4
      • 20190427_jl4f
    • 方案3——反转字符串:
      • 201904271 -> 172409102
      • 201904272 -> 272409102

(3) HBase内存优化

  • HBase操作过程中需要大量的内存开销,毕竟Table是可以缓存在内存中的,一般会分配整个可用内存的70%给HBase的Java堆。但是不建议分配非常大的堆内存,因为GC过程持续太久会导致RegionServer处于长期不可用状态,一般最多分配75%内存就可以了,如果因为框架占用内存过高导致系统内存不足,框架一样会被系统服务拖死。
  • vi hadoop-env.sh 设置内存的堆大小
    • export HADOOP_PORTMAP_OPTS='-XmxAAAAm $HADOOP_PORTMAP_OPTS'(AAAA为内存值)

(4) HBase基础优化

  • HBase基于HDFS存储:
    • 调大datanode允许最大文件打开数(默认4096):
      • HBase一般都会同一时间操作大量的文件,根据集群的数量和规模以及数据动作
      • vi hdfs-site.xml
      • <property>
            <name>dfs.datanode.max.transfer.threads</name>
            <value>XXXX</value>
        </property>
        
    • 调大等待时间(默认60000毫秒):
      • 如果对于某一次数据操作来讲,延迟非常高,socket需要等待更长的时间,建议把该值设置为更大的值,以确保socket不会被timeout掉
      • vi hdfs-site.xml
      • <property>
            <name>dfs.image.transfer.timeout</name>
            <value>XXXX</value>
        </property>
        
  • 高级优化:
    • 1.允许在HDFS的文件中追加内容(hdfs-site.xml、hbase-site.xml)
      • 属性:dfs.support.append
      • 解释:开启HDFS追加同步,可以优秀的配合HBase的数据同步和持久化。默认值为true。
    • 2.优化数据的写入效率(mapred-site.xml)
      • 属性:mapreduce.map.output.compress、mapreduce.map.output.compress.codec
      • 解释:开启这两个数据可以大大提高文件的写入效率,减少写入时间。第一个属性值修改为true,第二个属性值修改为:org.apache.hadoop.io.compress.GzipCodec或者其他压缩方式。
    • 3.设置RPC监听数量(hbase-site.xml)
      • 属性:hbase.regionserver.handler.count
      • 解释:默认值为30,用于指定RPC监听的数量,可以根据客户端的请求数进行调整,读写请求较多时,增加此值。
    • 4.优化HStore文件大小(hbase-site.xml)
      • 属性:hbase.hregion.max.filesize
      • 解释:默认值10737418240(10GB),如果需要运行HBase的MR任务,可以减小此值,因为一个region对应一个map任务,如果单个region过大,会导致map任务执行时间过长。该值的意思就是,如果HFile的大小达到这个数值,则这个region会被切分为两个Hfile。
    • 5.优化hbase客户端缓存(hbase-site.xml)
      • 属性:hbase.client.write.buffer
      • 解释:用于指定HBase客户端缓存,增大该值可以减少RPC调用次数,但是会消耗更多内存,反之则反之。一般我们需要设定一定的缓存大小,以达到减少RPC次数的目的。
    • 6.指定scan.next扫描HBase所获取的行数(hbase-site.xml)
      • 属性:hbase.client.scanner.caching
      • 解释:用于指定scan.next方法获取的默认行数,值越大,消耗内存越大。
    • 7.flush、compact、split机制
      • 当MemStore达到阈值,将Memstore中的数据Flush进Storefile;compact机制则是把flush出来的小文件合并成大的Storefile文件。split则是当Region达到阈值,会把过大的Region一分为二。
      • 涉及属性:
        • 128M就是Memstore的默认阈值
        • hbase.hregion.memstore.flush.size:134217728
          • 这个参数的作用是当单个HRegion内所有的Memstore大小总和超过指定值时,flush该HRegion的所有memstore。RegionServer的flush是通过将请求添加一个队列,模拟生产消费模型来异步处理的。那这里就有一个问题,当队列来不及消费,产生大量积压请求时,可能会导致内存陡增,最坏的情况是触发OOM。
        • hbase.regionserver.global.memstore.upperLimit:0.4
          hbase.regionserver.global.memstore.lowerLimit:0.38
          
          • 当MemStore使用内存总量达到hbase.regionserver.global.memstore.upperLimit指定值时,将会有多个MemStores flush到文件中,MemStore flush 顺序是按照大小降序执行的,直到刷新到MemStore使用内存略小于lowerLimit