mysql to hbase by sqoop

1,528 阅读5分钟

通过 sqoop 直接写入 hbase

使用场景

写入小数据量。

模板

sqoop import \
      --connect jdbc:mysql://<ip:port>/<datebase> \
      --username <name> \
      --password <psd> \
      --table <table> \
      --hbase-table <hbaseTable> \
      --column-family cf \
      --hbase-row-key id \
      --hbase-create-table

# 通过 --connect 项指定 mysql datebase
# 通过 --table 项指定 mysql table
# 通过 --hbase-table 项指定 target hbase table
# 通过 --column-family 项指定 写入的列族
# 通过 --hbase-row-key 项指定 row key

toHbase可选项:

Argument                            Description
--accumulo-table <table-nam>        Specifies an Accumulo table to use as the target instead of HDFS
--accumulo-column-family <family>   Sets the target column family for the import
--accumulo-create-table             If specified, create missing Accumulo tables
--accumulo-row-key <col>            Specifies which input column to use as the row key
--accumulo-visibility <vis>         (Optional) Specifies a visibility token to apply to all rows inserted into Accumulo. Default is the empty string.
--accumulo-batch-size <size>        (Optional) Sets the size in bytes of Accumulo’s write buffer. Default is 4MB.
--accumulo-max-latency <ms>         (Optional) Sets the max latency in milliseconds for the Accumulo batch writer. Default is 0.
--accumulo-zookeepers <host:port>   Comma-separated list of Zookeeper servers used by the Accumulo instance
--accumulo-instance <table-name>    Name of the target Accumulo instance
--accumulo-user <username>          Name of the Accumulo user to import as
--accumulo-password <password>      Password for the Accumulo user

测试结果:

18/11/14 12:08:56 INFO db.DBInputFormat: Using read commited transaction isolation
18/11/14 12:08:56 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`id`), MAX(`id`) FROM `operate_test`
18/11/14 12:08:56 INFO db.IntegerSplitter: Split size: 375584; Num splits: 4 from: 1 to: 1502338  //这里表明将mysql table分成了4部分
18/11/14 12:08:56 INFO mapreduce.JobSubmitter: number of splits:
18/11/14 12:08:56 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1542118887194_0001
18/11/14 12:08:56 INFO mapreduce.JobSubmitter: Kind: HDFS_DELEGATION_TOKEN, Service: 10.16.208.171:8020, Ident: (token for hbase: HDFS_DELEGATION_TOKEN owner=xxxx, renewer=yarn, realUser=, issueDate=1542168533283, maxDate=1542773333283, sequenceNumber=73, masterKeyId=72)
18/11/14 12:08:57 INFO impl.YarnClientImpl: Submitted application application_1542118887194_0001
18/11/14 12:08:57 INFO mapreduce.Job: The url to track the job: http://xxxx/proxy/application_1542118887194_0001/
18/11/14 12:08:57 INFO mapreduce.Job: Running job: job_1542118887194_0001
18/11/14 12:09:06 INFO mapreduce.Job: Job job_1542118887194_0001 running in uber mode : false
18/11/14 12:09:06 INFO mapreduce.Job:  map 0% reduce 0%
18/11/14 12:09:46 INFO mapreduce.Job:  map 25% reduce 0%
18/11/14 12:09:47 INFO mapreduce.Job:  map 75% reduce 0%
18/11/14 12:09:49 INFO mapreduce.Job:  map 100% reduce 0%
18/11/14 12:09:50 INFO mapreduce.Job: Job job_1542118887194_0001 completed successfully
18/11/14 12:09:50 INFO mapreduce.Job: Counters: 30
    File System Counters
        FILE: Number of bytes read=0
        FILE: Number of bytes written=877976
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=431
        HDFS: Number of bytes written=0
        HDFS: Number of read operations=4
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=0
    Job Counters
        Launched map tasks=4
        Other local map tasks=4
        Total time spent by all maps in occupied slots (ms)=157766
        Total time spent by all reduces in occupied slots (ms)=0
        Total time spent by all map tasks (ms)=157766
        Total vcore-milliseconds taken by all map tasks=157766
        Total megabyte-milliseconds taken by all map tasks=161552384
    Map-Reduce Framework
        Map input records=1497708
        Map output records=1497708
        Input split bytes=431
        Spilled Records=0
        Failed Shuffles=0
        Merged Map outputs=0
        GC time elapsed (ms)=948
        CPU time spent (ms)=207440
        Physical memory (bytes) snapshot=1978642432
        Virtual memory (bytes) snapshot=11696033792
        Total committed heap usage (bytes)=2478833664
    File Input Format Counters
        Bytes Read=0
    File Output Format Counters
        Bytes Written=0
18/11/14 12:09:50 INFO mapreduce.ImportJobBase: Transferred 0 bytes in 57.7245 seconds (0 bytes/sec)
18/11/14 12:09:50 INFO mapreduce.ImportJobBase: Retrieved 1497708 records.

通过 bulkload 的方式导入数据

使用场景

写入大数据量。

模板

sqoop import \
         --connect jdbc:mysql://ip:port/database?tinyInt1isBit=false \
         --username xxxxxxx \
         --password 'xxxxxxxxxxxxxx' \
         --query 'SELECT concat(a, b, c) as hbrk ,id,  a, b, c FROM tttt WHERE $CONDITIONS' \
         --verbose \
         --hbase-table plt:tttt \
         --column-family d \
         --split-by id \
         --hbase-row-key hbrk -m 30


tips :
# --hbase-table : 指定写入 hbase 的表名;
# --column-family : hbase 的 column-family;
# --split-by : map-reduce 任务的分切依据字段,需要注意 对应的字段为 mysql 表中的字段,且必须在 sql 语句中的 select 部分;
# --hbase-row-key : 指定 rowkey;
# -m : map-reduce 任务数量;

中途遇到的问题:

1、需要添加yarn的gateway

19/01/14 16:48:00 INFO mapreduce.HFileOutputFormat2: Incremental table test:cra_data_detail_new output configured.
19/01/14 16:48:00 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
19/01/14 16:48:00 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
19/01/14 16:48:00 WARN security.UserGroupInformation: PriviledgedActionException as:test@CDHTEST.COM (auth:KERBEROS) cause:java.io.IOException: Can't get Master Kerberos principal for use as renewer
19/01/14 16:48:00 DEBUG util.ClassLoaderStack: Restoring classloader: sun.misc.Launcher$AppClassLoader@17ed40e0
19/01/14 16:48:00 ERROR tool.ImportTool: Import failed: java.io.IOException: Can't get Master Kerberos principal for use as renewer
	at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:133)
	at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)
	at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:142)
	at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:270)
	at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:143)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1307)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1304)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
	at org.apache.hadoop.mapreduce.Job.submit(Job.java:1304)
	at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1325)
	at org.apache.sqoop.mapreduce.ImportJobBase.doSubmitJob(ImportJobBase.java:203)
	at org.apache.sqoop.mapreduce.ImportJobBase.runJob(ImportJobBase.java:176)
	at org.apache.sqoop.mapreduce.ImportJobBase.runImport(ImportJobBase.java:273)
	at org.apache.sqoop.manager.SqlManager.importQuery(SqlManager.java:747)
	at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:515)
	at org.apache.sqoop.tool.ImportTool.run(ImportTool.java:621)
	at org.apache.sqoop.Sqoop.run(Sqoop.java:147)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
	at org.apache.sqoop.Sqoop.runSqoop(Sqoop.java:183)
	at org.apache.sqoop.Sqoop.runTool(Sqoop.java:234)
	at org.apache.sqoop.Sqoop.runTool(Sqoop.java:243)
	at org.apache.sqoop.Sqoop.main(Sqoop.java:252)

2、--split-by 需要为mysql表中的字段

19/01/14 17:14:44 INFO mapreduce.Job: Task Id : attempt_1542122034332_12046_m_000000_0, Status : FAILED
Error: java.io.IOException: SQLException in nextKeyValue
	at org.apache.sqoop.mapreduce.db.DBRecordReader.nextKeyValue(DBRecordReader.java:277)
	at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:562)
	at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
	at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
	at org.apache.sqoop.mapreduce.AutoProgressMapper.run(AutoProgressMapper.java:64)
	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:793)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Unknown column 'mysql_id' in 'where clause'
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
	at com.mysql.jdbc.Util.getInstance(Util.java:408)
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:944)
	at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976)
	at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912)
	at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530)
	at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683)
	at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486)
	at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
	at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1966)
	at org.apache.sqoop.mapreduce.db.DBRecordReader.executeQuery(DBRecordReader.java:111)
	at org.apache.sqoop.mapreduce.db.DBRecordReader.nextKeyValue(DBRecordReader.java:235)
	... 12 more

3、 SELECT 中必须包含 --split-by 包含的列

参考:stackoverflow.com/questions/4…

原因 :

Firstly sqoop will fetch metadata (column details) from RDBMS based on your query.

Using query:

select accounts.first_name FROM accounts JOIN accountdevice
 ON (accounts.acct_num = accountdevice.account_id) WHERE 1 = 0

you see $CONDITIINS is replaced with 1 = 0 to fetch metadata.

Now your query will return only 1 column first_name and you are splitting on acct_num which is not queried from RDBMS table. That's why you are getting Unknown column error.

So make sure you SELECT split by column too in your SQL query.

19/01/14 17:56:56 INFO mapreduce.JobSubmitter: Cleaning up the staging area /user/test/.staging/job_1542122034332_12071
19/01/14 17:56:56 WARN security.UserGroupInformation: PriviledgedActionException as:test@CDHTEST.COM (auth:KERBEROS) cause:java.io.IOException: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Unknown column 'id' in 'field list'
19/01/14 17:56:56 DEBUG util.ClassLoaderStack: Restoring classloader: sun.misc.Launcher$AppClassLoader@17ed40e0
19/01/14 17:56:56 ERROR tool.ImportTool: Import failed: java.io.IOException: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Unknown column 'id' in 'field list'
	at org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat.getSplits(DataDrivenDBInputFormat.java:207)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:305)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:322)
	at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:200)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1307)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1304)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
	at org.apache.hadoop.mapreduce.Job.submit(Job.java:1304)
	at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1325)
	at org.apache.sqoop.mapreduce.ImportJobBase.doSubmitJob(ImportJobBase.java:203)
	at org.apache.sqoop.mapreduce.ImportJobBase.runJob(ImportJobBase.java:176)
	at org.apache.sqoop.mapreduce.ImportJobBase.runImport(ImportJobBase.java:273)
	at org.apache.sqoop.manager.SqlManager.importQuery(SqlManager.java:747)
	at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:515)
	at org.apache.sqoop.tool.ImportTool.run(ImportTool.java:621)
	at org.apache.sqoop.Sqoop.run(Sqoop.java:147)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
	at org.apache.sqoop.Sqoop.runSqoop(Sqoop.java:183)
	at org.apache.sqoop.Sqoop.runTool(Sqoop.java:234)
	at org.apache.sqoop.Sqoop.runTool(Sqoop.java:243)
	at org.apache.sqoop.Sqoop.main(Sqoop.java:252)
Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Unknown column 'id' in 'field list'
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
	at com.mysql.jdbc.Util.getInstance(Util.java:408)
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:944)
	at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976)
	at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912)
	at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530)
	at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683)
	at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2482)
	at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2440)
	at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1381)
	at org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat.getSplits(DataDrivenDBInputFormat.java:178)
	... 22 more