编译
# Hive编译
mvn clean package -DskipTests -Phadoop-2 -Pdist
# 安装包目录
cd packaging/target/apache-hive-{version}-SNAPSHOT-bin/apache-hive-{version}-SNAPSHOT-bin
schematool
- schematool -initSchema -dbType mysql 类:HiveSchemaTool,main方法中执行初始化sql: 比如:scripts/metastore/upgrade/mysql/hive-schema-2.3.0.mysql.sql
public void doInit(String toVersion) throws HiveMetaException {
testConnectionToMetastore();
System.out.println("Starting metastore schema initialization to " + toVersion);
String initScriptDir = metaStoreSchemaInfo.getMetaStoreScriptDir();
String initScriptFile = metaStoreSchemaInfo.generateInitFileName(toVersion);
try {
System.out.println("Initialization script " + initScriptFile);
if (!dryRun) {
runBeeLine(initScriptDir, initScriptFile);
System.out.println("Initialization script completed");
}
} catch (IOException e) {
throw new HiveMetaException("Schema initialization FAILED!" +
" Metastore state would be inconsistent !!", e);
}
}
hive-client
hive --debug --hiveconf hive.root.logger=INFO,console 类: org.apache.hive.beeline.cli.HiveCli
技术栈
serde antlr
启动命令
java -Xmx2048m -Djava.net.preferIPv4Stack=true -Duser.name=mart_dos
-Djava.util.logging.config.file=/software/servers/cloudeco/mart_dos/hive/conf/parquet-logging.properties
-Dhadoop.log.dir=/data0/hadoop-logs/ -Dhadoop.log.file=hadoop.log
-Dhadoop.home.dir=/data0/hadoop/hadoop_2.100.52_2020081716
-Dhadoop.id.str=mart_dos
-Dhadoop.root.logger=INFO,console
-Djava.library.path=/lib64/::/data0/hadoop/hadoop_2.100.52_2020081716/lib/native
-Dhadoop.policy.file=hadoop-policy.xml -Djava.net.preferIPv4Stack=true
-Djava.util.logging.config.file=/software/servers/cloudeco/mart_dos/hive/conf/parquet-logging.properties
-Dhadoop.security.logger=INFO,NullAppender
org.apache.hadoop.util.RunJar
/software/servers/cloudeco/mart_dos/hive/lib/hive-cli-2.0.0-SNAPSHOT.jar
org.apache.hadoop.hive.cli.CliDriver
cli启动
org.apache.hadoop.hive.cli.CliDriver:
1. prepare
初始化hiveconf, 创建CliSessionState,然后开启这个会话session,将会创建相应会话目录hive.metastore.warehouse.dir:hive数据在HDFS中的目录
hive.exec.scratchdir:hive在HDFS中的临时目录, HDFS路径,用于存储不同 map/reduce 阶段的执行计划和这些阶段的中间输出结果
hive.exec.local.scratchdir:
hive.downloaded.resources.dir:
hive.scratchdir.lock:
hive.exec.stagingdir → 向hive的表或分区中插入数据时候,首先将数据写入到临时目录,然后在mv过去, {user.name}/.staging
// "hive.exec.scratchdir", "/tmp/hive", "HDFS root scratch dir for Hive jobs"
// "hive.exec.local.scratchdir", "/tmp/${user}", "Local scratch space for Hive jobs"
// "hive.downloaded.resources.dir", "/tmp/${hive.session.id}_resources", "Temporary local directory for added resources in the remote file system."
// 1. HDFS scratch dir hdfs:///tmp/hive/root
path = new Path(rootHDFSDirPath, userName);
hdfsScratchDirURIString = path.toUri().toString();
createPath(conf, path, scratchDirPermission, false, false);
// 2. Local scratch dir file:///tmp/root
path = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.LOCALSCRATCHDIR));
createPath(conf, path, scratchDirPermission, true, false);
// 3. Download resources dir file:///tmp/2926e14c-6674-4b95-90b4-9f91ca67eb6a_resources
path = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR));
createPath(conf, path, scratchDirPermission, true, false);
// Finally, create session paths for this session
// Local & non-local tmp location is configurable. however it is the same across
// all external file systems
String sessionId = getSessionId();
// 4. HDFS session path hdfs:///tmp/hive/root/2926e14c-6674-4b95-90b4-9f91ca67eb6a
hdfsSessionPath = new Path(hdfsScratchDirURIString, sessionId);
createPath(conf, hdfsSessionPath, scratchDirPermission, false, true);
conf.set(HDFS_SESSION_PATH_KEY, hdfsSessionPath.toUri().toString());
// 5. hold a lock file in HDFS session dir to indicate the it is in use
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_SCRATCH_DIR_LOCK)) {
FileSystem fs = hdfsSessionPath.getFileSystem(conf);
FSDataOutputStream hdfsSessionPathInfoFile = fs.create(new Path(hdfsSessionPath, INFO_FILE_NAME),
true);
hdfsSessionPathInfoFile.writeUTF("process: " + ManagementFactory.getRuntimeMXBean().getName()
+"\n");
hdfsSessionPathInfoFile.close();
hdfsSessionPathLockFile = fs.create(new Path(hdfsSessionPath, LOCK_FILE_NAME), true);
}
// 6. Local session path file:///tmp/root/2926e14c-6674-4b95-90b4-9f91ca67eb6a
localSessionPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.LOCALSCRATCHDIR), sessionId);
createPath(conf, localSessionPath, scratchDirPermission, true, true);
conf.set(LOCAL_SESSION_PATH_KEY, localSessionPath.toUri().toString());
// 7. HDFS temp table space hdfs:///tmp/hive/root/2926e14c-6674-4b95-90b4-9f91ca67eb6a/_tmp_space.db
hdfsTmpTableSpace = new Path(hdfsSessionPath, TMP_PREFIX);
// This is a sub-dir under the hdfsSessionPath. Will be removed along with that dir.
// Don't register with deleteOnExit
createPath(conf, hdfsTmpTableSpace, scratchDirPermission, false, false);
2. init
创建视图注册器HiveMaterializedViewsRegistry时,创建了与HiveMetaStore交互的thrift client端Hive对象,注册函数function,获取metastore client的SessionHiveMetaStoreClient对象(SessionHiveMetaStoreClient被RetryingMetaStoreClient和SynchronizedHandlerd代理,用来重试和同步操作),开启socket连接metastore
public void init() {
try {
// Create a new conf object to bypass metastore authorization, as we need to
// retrieve all materialized views from all databases
HiveConf conf = new HiveConf();
conf.set(MetastoreConf.ConfVars.FILTER_HOOK.getVarname(),
DefaultMetaStoreFilterHookImpl.class.getName());
init(Hive.get(conf));
} catch (HiveException e) {
LOG.error("Problem connecting to the metastore when initializing the view registry", e);
}
}
3. executeDriver
执行cli工作executeDriver,等待用户输入
while ((line = reader.readLine(curPrompt + "> ")) != null) {
if (!prefix.equals("")) {
prefix += '\n';
}
if (line.trim().startsWith("--")) {
continue;
}
if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) {
line = prefix + line;
// 循环处理用户输入
ret = cli.processLine(line, true);
prefix = "";
curDB = getFormattedDb(conf, ss);
curPrompt = prompt + curDB;
dbSpaces = dbSpaces.length() == curDB.length() ? dbSpaces : spacesForString(curDB);
} else {
prefix = prefix + line;
curPrompt = prompt2 + dbSpaces;
continue;
}
}
show databases;
cliDriver处理非sql命令
识别命令类型:
第一层逻辑处理:
- quit/exit: 退出System.exit(0);
- source: 读取文件行,逐行执行
- !开头:执行shell命令
第二层逻辑处理: SET/RESET/DFS/CRYPTO/ADD/LIST/LLAP_CLUSTER/LLAP_CACHE/RELOAD/DELETE/COMPILE:非sql命令处理, 执行相应的processor的run方法,比如set命令执行SetProcessor对象的run方法。
// 工厂模式根据输入命令创建对应的CommandProcessor,Driver类也继承自CommandProcessor
public static CommandProcessor get(String[] cmd, @Nonnull HiveConf conf) throws SQLException {
CommandProcessor result = getForHiveCommand(cmd, conf);
if (result != null) {
return result;
}
if (isBlank(cmd[0])) {
return null;
} else {
return DriverFactory.newDriver(conf);
}
}
第三层逻辑处理: sql命令:创建Driver, 先编译Compile,然后运行execute
public CommandProcessorResponse run(String command) {
// 编译
CommandProcessorResponse r0 = compileAndRespond(command);
if (r0.getResponseCode() != 0) {
return r0;
}
// 执行
return run();
}
Compiler
技术栈:antlr/calcite
1. 语法解析
- 执行hive.query.lifetime.hooks的beforeParse方法
- 执行antlr语法解析,生成ASTNode
- 执行hive.query.lifetime.hooks的afterParse方法
hookRunner.runBeforeParseHook(command);
ASTNode tree;
try {
tree = ParseUtils.parse(command, ctx);
} catch (ParseException e) {
parseError = true;
throw e;
} finally {
hookRunner.runAfterParseHook(command, parseError);
}
2. 语义分析
- 执行hive.query.lifetime.hooks的beforeCompile方法
- 执行hive.semantic.analyzer.hook的preAnalyze方法
- 执行语义分析 工厂模式根据输入命令创建对应的语义分析器BaseSemanticAnalyzer, 分析器会分为DDL语句的DDLSemanticAnalyzer、查询query语句的CalcitePlanner和修改删除语句的UpdateDeleteSemanticAnalyzer等,执行分析器的analyze方法
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
sem.analyze(tree, ctx);
-
show database等DDL语句会执行DDLSemanticAnalyzer的analyzeShowDatabases方法,会创建DDLWork的计划plan和对应plan的DDLTask任务, 创建FetchWork的计划plan和对应的FetchTask任务
-
select等查询相关sql会执行CalcitePlanner的analyzeInternal方法,下面重点讲解:
2.1 SQL基本组成单元QueryBlock
// Gets all the aliases for all the tables / subqueries and makes the appropriate mapping in aliasToTabs, aliasToSubq
// Gets the location of the destination and names the clause "inclause" + i
// Creates a map from a string representation of an aggregation tree to the actual aggregation AST
// Creates a mapping from the clause name to the select expression AST in destToSelExpr
// Creates a mapping from a table alias to the lateral view AST's in aliasToLateralViews
if (!genResolvedParseTree(ast, plannerCtx)) {
return;
}
AST Tree仍然非常复杂,不够结构化,不方便直接翻译为MapReduce程序,AST Tree转化为QueryBlock就是将SQL进一部抽象和结构化。
QueryBlock是一条SQL最基本的组成单元,包括三个部分:输入源,计算过程,输出。简单来讲一个QueryBlock就是一个子查询。
下图为Hive中QueryBlock相关对象的类图,解释图中几个重要的属性
- QB#aliasToSubq(表示QB类的aliasToSubq属性)保存子查询的QB对象,aliasToSubq key值是子查询的别名
- QB#qbp即QBParseInfo保存一个基本SQL单元中的给个操作部分的AST Tree结构,
- QBParseInfo#nameToDest这个HashMap保存查询单元的输出,key的形式是inclause-i(由于Hive支持Multi Insert语句,所以可能有多个输出),value是对应的ASTNode节点,即TOK_DESTINATION节点。类QBParseInfo其余HashMap属性分别保存输出和各个操作的ASTNode节点的对应关系。
- QBParseInfo#JoinExpr保存TOK_JOIN节点。QB#QBJoinTree是对Join语法树的结构化。
- QB#qbm保存每个输入表的元信息,比如表在HDFS上的路径,保存表数据的文件格式等。
- QBExpr这个对象是为了表示Union操作。
AST Tree生成QueryBlock的过程是一个递归的过程,先序遍历AST Tree,遇到不同的Token节点,保存到相应的属性中,主要包含以下几个过程
- TOK_QUERY => 创建QB对象,循环递归子节点
- TOK_FROM => 将表名语法部分保存到QB对象的aliasToTabs等属性中
- TOK_INSERT => 循环递归子节点
- TOK_DESTINATION => 将输出目标的语法部分保存在QBParseInfo对象的nameToDest属性中
- TOK_SELECT => 分别将查询表达式的语法部分保存在destToSelExpr、destToAggregationExprs、destToDistinctFuncExprs三个属性中
- TOK_WHERE => 将Where部分的语法保存在QBParseInfo对象的destToWhereExpr属性中
3. 逻辑操作符Operator
// 2. Gen OP Tree from resolved Parse Tree
Operator sinkOp = genOPTree(ast, plannerCtx);
调用->ASTNode newAST = getOptimizedAST(); 又进入->RelNode optimizedOptiqPlan = logicalPlan();又进入calcitePlanner的apply方法
在apply方法中执行了:
//创建RelOptPlanner
RelOptPlanner planner = createPlanner(conf, corrScalarRexSQWithAgg, scalarAggNoGbyNoWin);
//将ASTNode转为calcite框架的关系代数RelNode
RelNode calciteGenPlan = genLogicalPlan(getQB(), true, null, null);
RelNode op = genTableLogicalPlan(tableAlias, qb);->tableRel = new HiveTableScan
srcRel = genJoinLogicalPlan -> RelNode filterRel = new HiveFilter等
//等执行calcite的关系代数优化计划
calciteGenPlan = hepPlan/calcitePreCboPlan = applyPreJoinOrderingTransforms
// apply 方法结束
//生成逻辑操作符Operator
sinkOp = genPlan(getQB());
-> Operator op = genTablePlan
遍历上一个过程中生成的QB和QBParseInfo对象的保存语法的属性,包含如下几个步骤:
- QB#aliasToSubq => 有子查询,递归调用
- QB#aliasToTabs => TableScanOperator
- QBParseInfo#joinExpr => QBJoinTree => ReduceSinkOperator + JoinOperator
- QBParseInfo#destToWhereExpr => FilterOperator
- QBParseInfo#destToGroupby => ReduceSinkOperator + GroupByOperator
- QBParseInfo#destToOrderby => ReduceSinkOperator + ExtractOperator
4. 逻辑执行计划优化
大部分逻辑层优化器通过变换OperatorTree,合并操作符,达到减少MapReduce Job,减少shuffle数据量的目的。
// Perform Logical optimization
Optimizer optm = new Optimizer();
optm.setPctx(pCtx);
optm.initialize(conf);
pCtx = optm.optimize();
5. 物理执行计划
TaskCompiler is a the base class for classes that compile operator pipelines into tasks. OperatorTree转化为MapReduce Job的过程分为下面几个阶段:
- 对输出表生成MoveTask
- 从OperatorTree的其中一个根节点向下深度优先遍历
- ReduceSinkOperator标示Map/Reduce的界限,多个Job间的界限
- 遍历其他根节点,遇过碰到JoinOperator合并MapReduceTask
- 生成StatTask更新元数据
- 剪断Map与Reduce间的Operator的关系
// 9. Optimize Physical op tree & Translate to target execution engine (MR, TEZ..)
if (!ctx.getExplainLogical()) {
TaskCompiler compiler = TaskCompilerFactory.getCompiler(conf, pCtx);
compiler.init(queryState, console, db);
compiler.compile(pCtx, rootTasks, inputs, outputs); //调用MapReduceCompiler的generateTaskTree(rootTasks, pCtx, mvTask, inputs, outputs);方法生成物理执行计划
fetchTask = pCtx.getFetchTask();
}
6. 物理执行计划优化
optimizeTaskPlan(rootTasks, pCtx, ctx);
//MapReduceCompiler实现
PhysicalContext physicalContext = new PhysicalContext(conf,
getParseContext(pCtx, rootTasks), ctx, rootTasks, pCtx.getFetchTask());
PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer(
physicalContext, conf);
physicalOptimizer.optimize();
- 执行hive.semantic.analyzer.hook的postAnalyze方法
- 执行hive.query.lifetime.hooks的afterCompile方法
Executor
- 执行hive.exec.driver.run.hooks的preDriverRun方法
- 执行hive.exec.pre.hooks的run方法
- 执行hive.query.lifetime.hooks的bbeforeExecution方法
- 获取mr/spark/tez作业job数
int mrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
int jobs = mrJobs + Utilities.getTezTasks(plan.getRootTasks()).size()
+ Utilities.getSparkTasks(plan.getRootTasks()).size();
if (jobs > 0) {
logMrWarning(mrJobs);
console.printInfo("Query ID = " + queryId);
console.printInfo("Total jobs = " + jobs);
}
- 创建DriverContext,遍历queryPlan的rootTasks,执行executeTask方法
Task<? extends Serializable> task;
while ((task = driverCxt.getRunnable(maxthreads)) != null) {
TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
if (!runner.isRunning()) {
break;
}
}
- 比如DDLTask会执行showDatabases方法,会执行Hive对象的getAllDatabases方法,调用metastoreClient rpc远程请求database数据
private int showDatabases(Hive db, ShowDatabasesDesc showDatabasesDesc) throws HiveException {
// get the databases for the desired pattern - populate the output stream
List<String> databases = null;
if (showDatabasesDesc.getPattern() != null) {
LOG.info("pattern: {}", showDatabasesDesc.getPattern());
databases = db.getDatabasesByPattern(showDatabasesDesc.getPattern());
} else {
databases = db.getAllDatabases();
}
DataOutputStream outStream = getOutputStream(showDatabasesDesc.getResFile());
try {
// 将结果输出到文件里
// [root@hadoop3 root]# cat /tmp/root/2926e14c-6674-4b95-90b4-9f91ca67eb6a/hive_2020-10-22_16-36-33_861_644734059292418218-1/-local-10000
// default
formatter.showDatabases(outStream, databases);
} catch (Exception e) {
throw new HiveException(e, ErrorMsg.GENERIC_ERROR, "show databases");
} finally {
IOUtils.closeStream(outStream);
}
return 0;
- 执行hive.exec.post.hooks的run方法
- 执行hive.query.lifetime.hooks的afterExecution方法
- 执行hive.exec.driver.run.hooks的postDriverRun方法
输出结果
打印文件输出,如果是fetchTask类型,调用fetchTask的fetch方法,会从上文输出文件中按照serde读出
//打印文件表头
printHeader(qp, out);
// print the results
int counter = 0;
try {
if (out instanceof FetchConverter) {
((FetchConverter) out).fetchStarted();
}
// 获取结果
while (qp.getResults(res)) {
for (String r : res) {
if (escapeCRLF) {
r = EscapeCRLFHelper.escapeCRLF(r);
}
out.println(r);
}
counter += res.size();
res.clear();
if (out.checkError()) {
break;
}
}
} catch (IOException e) {
console.printError("Failed with exception " + e.getClass().getName() + ":" + e.getMessage(),
"\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
ret = 1;
}
// 关闭查询,清楚关联资源
qp.close();