前言
在一些大佬的博客已查不到HBase2.x最新的实践代码,从某书上粘贴来的代码在新版本下执行不了,因此写下本篇实践,从HBase 1.4.2等老版本升级而来,想要使用Spark读写HBase2.0 API的可借鉴本文。ps:官网挂的示例也报错!(因为没找到依赖o(╯□╰)o)
代码环境
- Spark 2.3.1 (2.2,2.3.x系列应该都能用)
- HBase 2.0.0 (与Hbase 1.x系列不兼容)
- IDEA 2019.1 社区版
准备工作
HBase shell创建表
# hbase shell
> list //查看表
> create 'spark_hbase_src', 'info' //创建一张数据源表
> create 'spark_hbase_res', 'info' //创建一张结果表,用来写入计算结果
以上两张表就创建好了,简单。
准备示例数据
数据模型:模拟路上车辆的经过记录,为csv格式文本文件(txt)
- 字段5个:车牌号、车牌颜色、拍照设备编号、行驶方向、记录时间
- 对应英文:"number", "color", "device", "direction", "photo_time"
- 示例数据:模拟数据仅供参考
车牌号 | 车牌颜色 | 设备编号 | 行驶方向 | 记录时间 |
---|---|---|---|---|
豫A12345 | 蓝色 | D12C01 | 南北 | 2019/10/16 12:00:00 |
豫B12121 | 黄色 | D13C06 | 南北 | 2019/10/10 12:11:00 |
豫C66666 | 蓝色 | D15C08 | 西东 | 2019/10/29 12:09:00 |
豫D11111 | 蓝色 | D18C07 | 北南 | 2019/10/18 12:15:00 |
自己模拟生成一些文本数据,上传到hdfs,也可以在本机。
Maven依赖
HBase Server API
<!-- Hbase server库 提供Hbase读写API-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
之前只需要这个HBase jar就可以了,实践中有报错:
错误1
- Error 1:无法import org.apache.hadoop.hbase.mapreduce.TableInputFormat
解决办法
导入这个包:
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>${hbase.version}</version>
</dependency>
错误2
- Error 2:找不到org.apache.htrace.SamplerBuilder类
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/htrace/SamplerBuilder
Caused by: java.lang.ClassNotFoundException: org.apache.htrace.SamplerBuilder
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 17 more
解决办法
导入这个包:
<!-- https://mvnrepository.com/artifact/org.apache.htrace/htrace-core -->
<dependency>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
<version>3.1.0-incubating</version>
</dependency>
Spark等依赖
其他spark-core等依赖自行添加:
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<!-- Spark核心库 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--Spark sql库 提供DF类API -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
Spark写入HBase
代码实践
Ctrl+c自取:
import java.util.UUID
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSession
import scala.util.Try
object SparkWriteHBase {
val hbaseConfig = HBaseConfiguration.create()
hbaseConfig.set("hbase.zookeeper.quorum", "zk地址1,zk地址2,zk地址3")
hbaseConfig.set("hbase.zookeeper.property.clientPort", "2181")
//根据自己集群设置如下一行配置值
config.set("zookeeper.znode.parent","/hbase-unsecure")
//在IDE中设置此项为true,避免出现"hbase-default.xml"版本不匹配的运行时异常
hbaseConfig.set("hbase.defaults.for.version.skip", "true")
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Spark-HBase")
.master("local[2]")
.getOrCreate()
//读取的示例数据
val data = spark.read.csv("hdfs://your-hdfs-host:8020/traffic.txt")
.toDF("number", "color", "device", "direction", "photo_time")
println("数据条数是:" + data.count())
val SRC_FAMILYCOLUMN = "info"
data.foreachPartition(p => {
//获取HBase连接
val hbaseConn = ConnectionFactory.createConnection(hbaseConfig)
val resultTable = TableName.valueOf("spark_hbase_src")
//获取表连接
val table = hbaseConn.getTable(resultTable)
p.foreach(r => {
val put = new Put(Bytes.toBytes(UUID.randomUUID().toString))
put.addColumn(Bytes.toBytes(SRC_FAMILYCOLUMN), Bytes.toBytes("number"), Bytes.toBytes(r.getString(0)))
put.addColumn(Bytes.toBytes(SRC_FAMILYCOLUMN), Bytes.toBytes("color"), Bytes.toBytes(r.getString(1)))
put.addColumn(Bytes.toBytes(SRC_FAMILYCOLUMN), Bytes.toBytes("device"), Bytes.toBytes(r.getString(2)))
put.addColumn(Bytes.toBytes(SRC_FAMILYCOLUMN), Bytes.toBytes("direction"), Bytes.toBytes(r.getString(3)))
put.addColumn(Bytes.toBytes(SRC_FAMILYCOLUMN), Bytes.toBytes("photo_time"), Bytes.toBytes(r.getString(4)))
Try(table.put(put)).getOrElse(table.close()) //将数据写入HBase,若出错关闭table
})
table.close()
hbaseConn.close()
})
}
}
写操作结果查看
写入前后数据量对比:0 -> 1199:
Spark读取HBase
代码实践
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSession
object SparkReadHbase {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Spark-HBase")
.master("local")
.getOrCreate()
val hbaseConfig = HBaseConfiguration.create()
hbaseConfig.set("hbase.zookeeper.quorum", "zk地址1,zk地址2,zk地址3")
hbaseConfig.set("hbase.zookeeper.property.clientPort", "2181")
//在IDE中设置此项为true,避免出现"hbase-default.xml"版本不匹配的运行时异常
hbaseConfig.set("hbase.defaults.for.version.skip", "true")
hbaseConfig.set(TableInputFormat.INPUT_TABLE, "spark_hbase_src")
val SRC_FAMILYCOLUMN = "info"
//从hbase中读取RDD
val hbaseRDD = spark.sparkContext.newAPIHadoopRDD(hbaseConfig,
classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
import spark.implicits._
hbaseRDD.map({ case (_, result) =>
// val key = Bytes.toString(result.getRow)
val number = Bytes.toString(result.getValue(SRC_FAMILYCOLUMN.getBytes, "number".getBytes))
val color = Bytes.toString(result.getValue(SRC_FAMILYCOLUMN.getBytes, "color".getBytes))
val device = Bytes.toString(result.getValue(SRC_FAMILYCOLUMN.getBytes, "device".getBytes))
val direction = Bytes.toString(result.getValue(SRC_FAMILYCOLUMN.getBytes, "direction".getBytes))
val photo_time = Bytes.toString(result.getValue(SRC_FAMILYCOLUMN.getBytes, "photo_time".getBytes))
(number, color, device, direction, photo_time)
}).toDF("number", "color", "device", "direction", "photo_time").show(false)
}
}
运行结果
show()的打印截图~成功读取到HBase中的数据:
官网示例踩坑
官网的 Example 36. HBaseContext Usage Example 如下:
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
...
val hbaseContext = new HBaseContext(sc, config)
- 不知道HBaseContext是哪一个jar包引入的,官网没指名用的什么Maven依赖!(后文看到可从一个项目可编译,mvn也提供了一个1.0版本jar包)
- new SparkContext("local", "test") 这种写法是这个包独有的。详情如下:
2019-10-10我编译了一下这个源码得到jar包,同时mvn官网也提供了一个1.0版本的依赖可以用 ↓↓↓(传送门)
编译Hbase Spark Connector指南
提供spark读写hbase的api,可作为hbase-server库之外的另一种选择↑↑↑
使用Spark RDD写HBase
由以下两种,主要区别是使用的配置文件对象不同
saveAsHadoopDataset
使用Hadoop JobConf配置,初始化JobConf用的TableOutputFormat类 是 org.apache.hadoop.hbase.mapred 包下的。
saveAsNewAPIHadoopDataset
使用Hadoop Configuration配置,使用的 TableInputFormat 类是 org.apache.hadoop.hbase.mapreduce 包下的
这两个API的使用方法类似,示例如下:
代码实现
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.SparkSession
object SparkWriteHBaseByHadoopDataset {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("SparkWriteHBase2").master("local").getOrCreate()
val sc = spark.sparkContext
val tableName = "test_student"
val config = HBaseConfiguration.create()
config.set("hbase.zookeeper.quorum", "manager.bigdata,master.bigdata,worker.bigdata")
config.set("hbase.zookeeper.property.clientPort", "2181")
config.set("hbase.defaults.for.version.skip", "true")
val inputDataRDD = sc.parallelize(Array("1,Jack,M,26", "2,Rose,M,17")) //模拟构建两行记录的RDD
val rdd = inputDataRDD.map(_.split(',')).map { arr => {
val put = new Put(Bytes.toBytes(arr(0))) //行健的值
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(arr(1))) //info:name列的值
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("gender"), Bytes.toBytes(arr(2))) //info:gender列的值
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(arr(3).toInt)) //info:age列的值
(new ImmutableBytesWritable, put)
}
}
// 初始化JobConf,TableOutputFormat 是 org.apache.hadoop.hbase.mapred 包下的
val jobConf = new JobConf(config)
jobConf.setOutputFormat(classOf[org.apache.hadoop.hbase.mapred.TableOutputFormat])
rdd.saveAsHadoopDataset(jobConf)
//TableInputFormat 是 org.apache.hadoop.hbase.mapreduce 包下的
config.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(config)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
}
}
两种API方法底层均调用SparkHadoopWriter
对象的write方法,无性能差异。
Spark创建HBase表
核心API代码示例
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ColumnFamilyDescriptorBuilder, ConnectionFactory, TableDescriptorBuilder}
def main(args: Array[String]): Unit = {
val hbaseConn = ConnectionFactory.createConnection(hbaseConfig)
val admin = hbaseConn.getAdmin
//如果不存在就创建表
if (!admin.tableExists(TableName.valueOf("test_hb_new_api"))) {
val desc = TableDescriptorBuilder.newBuilder(TableName.valueOf("test_hb_new_api"))
//指定列簇 不需要创建列,列式存储不需要创建列
val cf1 = ColumnFamilyDescriptorBuilder.newBuilder("cf1".getBytes()).build()
desc.setColumnFamily(cf1)
admin.createTable(desc.build())
}
}
仅仅创建表,不需要spark的参与,写在Spark代码里当然也可以运行!
已过时的API
val desc = new HTableDescriptor(TableName.valueOf("hb_test"))
//这些API已经被标记为Deprecated,将会在HBase3.0移除!
val hcd = new HColumnDescriptor("cf")
Spark删除HBase表(接创建HBase表)
admin变量的来历在创建的代码里有。删表也和spark没啥关系,就是调用。
//drop table
admin.disableTable(table_name)
admin.deleteTable(table_name)
SparkSQL操作HBase (SHC)
哈 ┐(゚~゚)┌,这个待续等了半年才来补充(2020-05)
来自 Hortonworks 的工程师们为我们带来了全新的 Apache Spark—Apache HBase Connector,下面简称 SHC。通过这个类库,我们可以直接使用 Spark SQL 将 DataFrame 中的数据写入到 HBase 中;而且我们也可以使用 Spark SQL 去查询 HBase 中的数据。
看了一下,大佬介绍的一个SHC用法:SHC:使用 Spark SQL 高效地读写 HBase,无论怎样,HBase catalog类似的信息都要手写,大概这样的:
val catalog = s"""{
|"table":{"namespace":"default", "name":"iteblog", "tableCoder":"PrimitiveType"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"id", "type":"int"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin
其实上面读取数据之后有toDF()就是SparkSQL的天下了,没必要那么纠结。(2020-0603)