Storm 应用实例 -- 集成 HBase

1,340 阅读6分钟

本文展示一个Storm的topology,该topology对给定的词源进行词频统计,然后存入HBase,该实例不借助storm-hbase包,而是直接使用hbase client来完成对HBase的操作。

引言

由Twitter开源的、分布式实时计算系统Apache Storm,如今已被多家知名企业应用于实时分析、流式计算、在线机器学习、分布式RPC调用、ETL等领域,甚至有看到“Storm之于实时计算,就像Hadoop之于数据批处理”这样的评价,是否言过其实,这里暂且不论,但至少已经看到业界对Storm在实时计算领域的肯定,加之其开源特性,必然会得到更广泛的应用。
在Storm的实际应用中,在topology中将经过处理的数据通过HBase进行持久化,是一个常见的需求。Storm官方提供了storm-hbase,包含一些比较通用的API及其简单实现,可以查看对应的官方文档来了解基本使用方法:storm-hbase。但如果你需要进行一些更复杂的处理,或者希望对自己的代码有更多的掌控,那么脱离storm-hbase,直接使用HBase的Java API来完成操作,将是一个不错的选择。本文将展示的,就是一个在Storm的topology中直接使用HBase Java API操作HBase的简单示例。

零.示例简述

本项目数据源部分直接借用Storm词频统计的官方示例,在WordSpout.java中从静态字符串数组中读取单词,在WordCounterBolt.java中统计单词出现的次数,最后在MyHBaseBolt.java中将单词及其出现的次数写入到HBase。

一.环境信息

示例的测试环境:

  • Java 8
  • Storm 1.0.1
  • HBase 1.2.2
  • Hadoop 2.6.4
  • Maven 3.3.3

二.创建项目

示例直接使用hbase client操作HBase,因此关键的依赖只有storm和hbase client,项目pom.xml:


  UTF-8
  1.0.1
  
  compile



    
        org.apache.storm
        storm-core
        ${storm.version}
        ${storm.scope}
    
    
        org.apache.hbase
        hbase-client
        1.2.2
    

项目结构:

--src
   --main
       --java
          --bolt
              --MyHBaseBolt.java
              --WordCounterBolt.java
          --spout
              --WordSpout.java
          --HBaseTopology.java
       --resources
           --hbase-site.xml

其中hbase-site.xml直接使用HBase服务器上面的hbase-site.xml即可。本示例的HBase集群使用独立的zookeeper集群,zk的端口使用了默认端口,因此不需要在hbase-site.xml中显式配置,详细内容见附录。

三.词频统计

这部分直接借用一个Storm官方示例:WordSpout.java从静态数组中随机读取单词并向外发射,WordCounterBolt接收来自WordSpout的包含一个个单词的tuple,对每个单词出现的次数进行统计,然后将每个单词及其对应的计数向外发射。为快速进入主题,这部分代码放在附录中。

四.HBase操作

在java中通过hbase client对hbase进行读写大体有如下步骤:

  • 创建HBaseConfiguration对象,该对象可以读取CLASSPATH下的hbase-site.xml文件的内容。
    Configuration config = HBaseConfiguration.create();
  • 用前面的config对象为入参创建Connection对象来连接至目标HBase集群。connection对象对资源消耗较大,应该避免创建过多的实例。使用完毕后,调用connection的close()方法关闭连接,建议使用try/finally来确保连接的关闭。
    Connection connection = ConnectionFactory.createConnection(config);
  • 以指定的table名称(应该是已存在的)为入参创建Table对象来连接指定的表。使用完毕后,需要调用table的close()方法进行关闭。与connection不同,table对象是轻量的,对table对象的创建,不需要像connection那样小心,当然,这并不是鼓励你创建得越多越好。
    Table table = connection.getTable(TableName.valueOf("WordCount"));
  • 以指定的row key(可以是在HBase中还不存在的)为入参创建Put对象来持有要写入的数据。
    Put p = new Put(Bytes.toBytes("key"));
  • 调用Put对象的addColumn方法,接受列族名称(column family)、列名(column qualifier)和要写入的值作为参数。可以多次调用该方法让put对象持有一定数量的数据后,再一次性提交。
    put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("words"), Bytes.toBytes("word"));
  • 以Put对象为入参,调用table的put方法来提交要写入hbase的数据
  • 关闭table
  • 关闭connection

在Storm的bolt中进行实际应用:

public class MyHBaseBolt extends BaseBasicBolt {
    private Connection connection;
    private Table table;

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        Configuration config = HBaseConfiguration.create();
        try {
            connection = ConnectionFactory.createConnection(config);
//示例都是对同一个table进行操作,因此直接将Table对象的创建放在了prepare,在bolt执行过程中可以直接重用。
            table = connection.getTable(TableName.valueOf("WordCount"));
        } catch (IOException e) {
            //do something to handle exception
        }
    }
    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        //从tuple中获取单词
        String word = tuple.getString(0);
        //从tuple中获取计数,这里转换为String只是为了示例运行后存入hbase的计数值能够直观显示。
        String count = tuple.getInteger(1).toString();
        try {
            //以各个单词作为row key
            Put put = new Put(Bytes.toBytes(word));
            //将被计数的单词写入cf:words列
            put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("words"), Bytes.toBytes(word));
            //将单词的计数写入cf:counts列
            put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("counts"), Bytes.toBytes(count));
            table.put(put);
        } catch (IOException e) {
            //do something to handle exception
        }
    }
    @Override
    public void cleanup() {
        //关闭table
        try {
            if(table != null) table.close();
        } catch (Exception e){
            //do something to handle exception
        } finally {
            //在finally中关闭connection
            try {
                connection.close();
            } catch (IOException e) {
                //do something to handle exception
            }
        }
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //示例中本bolt不向外发射数据,所以没有再做声明
    }
}

虽然可能应用场景相对较少,但还是附带介绍一下从HBase读取数据:

  • 以指定的row key为入参创建Get对象
    Get get = new Get(Bytes.toBytes("key"));
  • 以Get实例为入参调用table的get方法来获取结果集对象Result
    Result r = table.get(get);
  • 从结果集中获取制定列的值
    byte[] value = r.getValue(Bytes.toBytes("cf"), Bytes.toBytes("words"));
  • 也可以使用scan来批量读取,Scanner实现了Iterable,因此可以使用foreach来进行遍历:

    Scan scan = new Scan();
    //获取指定列族所有列的数据
    scan.addFamily(Bytes.toBytes("cf"));
    ResultScanner scanner = table.getScanner(scan);
    try {
      for (Result r : scanner) {...}
    }finally{
      scanner.close();
      }
    

    五.Topology

    topology中唯一需要注意的是,在Windows测试该示例时,需要配置hadoop.home.dir属性,并确保将winutils.exe客户端(示例中使用的版本(链接若失效请自助))放置在所配置的hadoop.home.dir目录下(资料解释:在hadoop 2.x版本的包中不再包含winutils.exe文件)。
    HBaseTopology.java:

    public class PersistentWordCount {
      private static final String WORD_SPOUT = "WORD_SPOUT";
      private static final String COUNT_BOLT = "COUNT_BOLT";
      private static final String HBASE_BOLT = "HBASE_BOLT";
    
      public static void main(String[] args) throws Exception {
          System.setProperty("hadoop.home.dir","E:/BaiduYunDownload");
    
          Config config = new Config();
    
          WordSpout spout = new WordSpout();
          WordCounter bolt = new WordCounter();
          MyHBaseBolt hbase = new MyHBaseBolt();
    
          // wordSpout ==> countBolt ==> HBaseBolt
          TopologyBuilder builder = new TopologyBuilder();
    
          builder.setSpout(WORD_SPOUT, spout, 1);
          builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
          builder.setBolt(HBASE_BOLT, hbase, 10).fieldsGrouping(COUNT_BOLT, new Fields("word"));
    
          if (args.length == 0) {
              LocalCluster cluster = new LocalCluster();
              cluster.submitTopology("word", config, builder.createTopology());
              Thread.sleep(10000);
              cluster.killTopology("word");
              cluster.shutdown();
              System.exit(0);
          } else {
              config.setNumWorkers(3);
              StormSubmitter.submitTopology(args[0], config, builder.createTopology());
          }
    }
    

    如果编译遇到类似:java.io.IOException: No FileSystem for scheme: hdfs这样关于hadoop的问题,可能需要添加hadoop相关依赖包,如:

    
      org.apache.hadoop
      hadoop-common
      2.6.4
    
    
      org.apache.hadoop
      hadoop-hdfs
      2.6.4
    
    

六.总结

本文通过一个词频统计后通过HBase进行结果持久化的topology示例,展示了如何在Storm的中直接使用HBase的java api来实现基本的读写操作,希望能为想自己完成Storm的HBase集成而不得其法的朋友提供一个入门指引。

附录

  1. WordSpout.java:

    public class WordSpout extends BaseRichSpout {
     private SpoutOutputCollector collector;
     private static final String[] MSGS = new String[]{
             "Storm", "HBase", "Integration", "example", "by ", "aloo", "in", "Aug",
     };
    
     private static final Random random = new Random();
    
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare(new Fields("word"));
     }
    
     @Override
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
         this.collector = collector;
     }
    
     @Override
     public void nextTuple() {
         String word = MSGS[random.nextInt(8)];
         collector.emit(new Values(word));
     }
    }
    
  2. WordCounterBolt.java:

    public class WordCounter extends BaseBasicBolt {
     private Map _counts = new HashMap();
    
     @Override
     public void execute(Tuple tuple, BasicOutputCollector collector) {
         String word = tuple.getString(0);
         int count;
         if(_counts.containsKey(word)){
             count = _counts.get(word);
         } else {
             count = 0;
         }
         count ++;
         _counts.put(word, count);
         collector.emit(new Values(word, count));
     }
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare(new Fields("word", "count"));
     }
    }
    
  3. hbase-site.xml
    
    
    
     
         hbase.cluster.distributed
         true
     
     
         hbase.rootdir
         hdfs://xxx.xx.xx.xx:9000/hbase
     
     
         hbase.zookeeper.property.dataDir
         /home/hadoop/hbase/storm/zookeeper
     
     
         hbase.zookeeper.quorum
         zknode1,zdnode2,zknode3