hive-4.源码-client

1,226 阅读8分钟

编译

# Hive编译
mvn clean package -DskipTests -Phadoop-2 -Pdist
# 安装包目录
cd packaging/target/apache-hive-{version}-SNAPSHOT-bin/apache-hive-{version}-SNAPSHOT-bin

schematool

  1. schematool -initSchema -dbType mysql 类:HiveSchemaTool,main方法中执行初始化sql: 比如:scripts/metastore/upgrade/mysql/hive-schema-2.3.0.mysql.sql
public void doInit(String toVersion) throws HiveMetaException {
    testConnectionToMetastore();
    System.out.println("Starting metastore schema initialization to " + toVersion);

    String initScriptDir = metaStoreSchemaInfo.getMetaStoreScriptDir();
    String initScriptFile = metaStoreSchemaInfo.generateInitFileName(toVersion);

    try {
        System.out.println("Initialization script " + initScriptFile);
        if (!dryRun) {
            runBeeLine(initScriptDir, initScriptFile);
            System.out.println("Initialization script completed");
        }
    } catch (IOException e) {
    throw new HiveMetaException("Schema initialization FAILED!" +
        " Metastore state would be inconsistent !!", e);
    }
}

hive-client

hive --debug --hiveconf hive.root.logger=INFO,console 类: org.apache.hive.beeline.cli.HiveCli

技术栈

serde antlr

启动命令

java -Xmx2048m -Djava.net.preferIPv4Stack=true -Duser.name=mart_dos 
-Djava.util.logging.config.file=/software/servers/cloudeco/mart_dos/hive/conf/parquet-logging.properties 
-Dhadoop.log.dir=/data0/hadoop-logs/ -Dhadoop.log.file=hadoop.log 
-Dhadoop.home.dir=/data0/hadoop/hadoop_2.100.52_2020081716 
-Dhadoop.id.str=mart_dos 
-Dhadoop.root.logger=INFO,console 
-Djava.library.path=/lib64/::/data0/hadoop/hadoop_2.100.52_2020081716/lib/native 
-Dhadoop.policy.file=hadoop-policy.xml -Djava.net.preferIPv4Stack=true 
-Djava.util.logging.config.file=/software/servers/cloudeco/mart_dos/hive/conf/parquet-logging.properties 
-Dhadoop.security.logger=INFO,NullAppender 
org.apache.hadoop.util.RunJar 
/software/servers/cloudeco/mart_dos/hive/lib/hive-cli-2.0.0-SNAPSHOT.jar 
org.apache.hadoop.hive.cli.CliDriver

cli启动

org.apache.hadoop.hive.cli.CliDriver:

1. prepare

初始化hiveconf, 创建CliSessionState,然后开启这个会话session,将会创建相应会话目录hive.metastore.warehouse.dir:hive数据在HDFS中的目录

hive.exec.scratchdir:hive在HDFS中的临时目录, HDFS路径,用于存储不同 map/reduce 阶段的执行计划和这些阶段的中间输出结果
hive.exec.local.scratchdir:
hive.downloaded.resources.dir: hive.scratchdir.lock: hive.exec.stagingdir → 向hive的表或分区中插入数据时候,首先将数据写入到临时目录,然后在mv过去, hive.exec.scratchdir/{hive.exec.scratchdir}/{user.name}/.staging

// "hive.exec.scratchdir", "/tmp/hive", "HDFS root scratch dir for Hive jobs"
// "hive.exec.local.scratchdir", "/tmp/${user}", "Local scratch space for Hive jobs"
// "hive.downloaded.resources.dir", "/tmp/${hive.session.id}_resources", "Temporary local directory for added resources in the remote file system."
// 1. HDFS scratch dir           hdfs:///tmp/hive/root                                                       
path = new Path(rootHDFSDirPath, userName);                                    
hdfsScratchDirURIString = path.toUri().toString();
createPath(conf, path, scratchDirPermission, false, false);
// 2. Local scratch dir          file:///tmp/root
path = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.LOCALSCRATCHDIR));
createPath(conf, path, scratchDirPermission, true, false);
// 3. Download resources dir      file:///tmp/2926e14c-6674-4b95-90b4-9f91ca67eb6a_resources
path = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR));
createPath(conf, path, scratchDirPermission, true, false);
// Finally, create session paths for this session
// Local & non-local tmp location is configurable. however it is the same across
// all external file systems
String sessionId = getSessionId();
// 4. HDFS session path          hdfs:///tmp/hive/root/2926e14c-6674-4b95-90b4-9f91ca67eb6a
hdfsSessionPath = new Path(hdfsScratchDirURIString, sessionId);
createPath(conf, hdfsSessionPath, scratchDirPermission, false, true);
conf.set(HDFS_SESSION_PATH_KEY, hdfsSessionPath.toUri().toString());
// 5. hold a lock file in HDFS session dir to indicate the it is in use
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_SCRATCH_DIR_LOCK)) {
  FileSystem fs = hdfsSessionPath.getFileSystem(conf);
  FSDataOutputStream hdfsSessionPathInfoFile = fs.create(new Path(hdfsSessionPath, INFO_FILE_NAME),
      true);
  hdfsSessionPathInfoFile.writeUTF("process: " + ManagementFactory.getRuntimeMXBean().getName()
      +"\n");
  hdfsSessionPathInfoFile.close();
  hdfsSessionPathLockFile = fs.create(new Path(hdfsSessionPath, LOCK_FILE_NAME), true);
}
// 6. Local session path         file:///tmp/root/2926e14c-6674-4b95-90b4-9f91ca67eb6a
localSessionPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.LOCALSCRATCHDIR), sessionId);
createPath(conf, localSessionPath, scratchDirPermission, true, true);
conf.set(LOCAL_SESSION_PATH_KEY, localSessionPath.toUri().toString());
// 7. HDFS temp table space      hdfs:///tmp/hive/root/2926e14c-6674-4b95-90b4-9f91ca67eb6a/_tmp_space.db
hdfsTmpTableSpace = new Path(hdfsSessionPath, TMP_PREFIX);
// This is a sub-dir under the hdfsSessionPath. Will be removed along with that dir.
// Don't register with deleteOnExit
createPath(conf, hdfsTmpTableSpace, scratchDirPermission, false, false);

2. init

创建视图注册器HiveMaterializedViewsRegistry时,创建了与HiveMetaStore交互的thrift client端Hive对象,注册函数function,获取metastore client的SessionHiveMetaStoreClient对象(SessionHiveMetaStoreClient被RetryingMetaStoreClient和SynchronizedHandlerd代理,用来重试和同步操作),开启socket连接metastore

  public void init() {
    try {
      // Create a new conf object to bypass metastore authorization, as we need to
      // retrieve all materialized views from all databases
      HiveConf conf = new HiveConf();
      conf.set(MetastoreConf.ConfVars.FILTER_HOOK.getVarname(),
          DefaultMetaStoreFilterHookImpl.class.getName());
      init(Hive.get(conf));
    } catch (HiveException e) {
      LOG.error("Problem connecting to the metastore when initializing the view registry", e);
    }
  }

3. executeDriver

执行cli工作executeDriver,等待用户输入

while ((line = reader.readLine(curPrompt + "> ")) != null) {
      if (!prefix.equals("")) {
        prefix += '\n';
      }
      if (line.trim().startsWith("--")) {
        continue;
      }
      if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) {
        line = prefix + line;
        // 循环处理用户输入
        ret = cli.processLine(line, true);
        prefix = "";
        curDB = getFormattedDb(conf, ss);
        curPrompt = prompt + curDB;
        dbSpaces = dbSpaces.length() == curDB.length() ? dbSpaces : spacesForString(curDB);
      } else {
        prefix = prefix + line;
        curPrompt = prompt2 + dbSpaces;
        continue;
      }
    }

show databases;

cliDriver处理非sql命令

识别命令类型:

第一层逻辑处理:

  • quit/exit: 退出System.exit(0);
  • source: 读取文件行,逐行执行
  • !开头:执行shell命令

第二层逻辑处理: SET/RESET/DFS/CRYPTO/ADD/LIST/LLAP_CLUSTER/LLAP_CACHE/RELOAD/DELETE/COMPILE:非sql命令处理, 执行相应的processor的run方法,比如set命令执行SetProcessor对象的run方法。

  // 工厂模式根据输入命令创建对应的CommandProcessor,Driver类也继承自CommandProcessor
  public static CommandProcessor get(String[] cmd, @Nonnull HiveConf conf) throws SQLException {
    CommandProcessor result = getForHiveCommand(cmd, conf);
    if (result != null) {
      return result;
    }
    if (isBlank(cmd[0])) {
      return null;
    } else {
      return DriverFactory.newDriver(conf);
    }
  }

LanguageManual Commands

第三层逻辑处理: sql命令:创建Driver, 先编译Compile,然后运行execute

  public CommandProcessorResponse run(String command) {
  // 编译
    CommandProcessorResponse r0 = compileAndRespond(command);
    if (r0.getResponseCode() != 0) {
      return r0;
    }
    // 执行
    return run();
  }

Compiler

技术栈:antlr/calcite

image.png

1. 语法解析

  • 执行hive.query.lifetime.hooks的beforeParse方法
  • 执行antlr语法解析,生成ASTNode
  • 执行hive.query.lifetime.hooks的afterParse方法
hookRunner.runBeforeParseHook(command);

ASTNode tree;
try {
tree = ParseUtils.parse(command, ctx);
} catch (ParseException e) {
parseError = true;
throw e;
} finally {
hookRunner.runAfterParseHook(command, parseError);
}

2. 语义分析

  • 执行hive.query.lifetime.hooks的beforeCompile方法
  • 执行hive.semantic.analyzer.hook的preAnalyze方法
  • 执行语义分析 工厂模式根据输入命令创建对应的语义分析器BaseSemanticAnalyzer, 分析器会分为DDL语句的DDLSemanticAnalyzer、查询query语句的CalcitePlanner和修改删除语句的UpdateDeleteSemanticAnalyzer等,执行分析器的analyze方法
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
sem.analyze(tree, ctx);
  1. show database等DDL语句会执行DDLSemanticAnalyzer的analyzeShowDatabases方法,会创建DDLWork的计划plan和对应plan的DDLTask任务, 创建FetchWork的计划plan和对应的FetchTask任务

  2. select等查询相关sql会执行CalcitePlanner的analyzeInternal方法,下面重点讲解:

2.1 SQL基本组成单元QueryBlock

    // Gets all the aliases for all the tables / subqueries and makes the appropriate mapping in aliasToTabs, aliasToSubq 
    // Gets the location of the destination and names the clause "inclause" + i 
    // Creates a map from a string representation of an aggregation tree to the actual aggregation AST
    // Creates a mapping from the clause name to the select expression AST in destToSelExpr 
    // Creates a mapping from a table alias to the lateral view AST's in aliasToLateralViews
    if (!genResolvedParseTree(ast, plannerCtx)) {
      return;
    }

AST Tree仍然非常复杂,不够结构化,不方便直接翻译为MapReduce程序,AST Tree转化为QueryBlock就是将SQL进一部抽象和结构化。

QueryBlock是一条SQL最基本的组成单元,包括三个部分:输入源,计算过程,输出。简单来讲一个QueryBlock就是一个子查询。

下图为Hive中QueryBlock相关对象的类图,解释图中几个重要的属性

image.png

  • QB#aliasToSubq(表示QB类的aliasToSubq属性)保存子查询的QB对象,aliasToSubq key值是子查询的别名
    • QB#qbp即QBParseInfo保存一个基本SQL单元中的给个操作部分的AST Tree结构,
    • QBParseInfo#nameToDest这个HashMap保存查询单元的输出,key的形式是inclause-i(由于Hive支持Multi Insert语句,所以可能有多个输出),value是对应的ASTNode节点,即TOK_DESTINATION节点。类QBParseInfo其余HashMap属性分别保存输出和各个操作的ASTNode节点的对应关系。
    • QBParseInfo#JoinExpr保存TOK_JOIN节点。QB#QBJoinTree是对Join语法树的结构化。
    • QB#qbm保存每个输入表的元信息,比如表在HDFS上的路径,保存表数据的文件格式等。
    • QBExpr这个对象是为了表示Union操作。

image.png AST Tree生成QueryBlock的过程是一个递归的过程,先序遍历AST Tree,遇到不同的Token节点,保存到相应的属性中,主要包含以下几个过程

- TOK_QUERY => 创建QB对象,循环递归子节点
- TOK_FROM => 将表名语法部分保存到QB对象的aliasToTabs等属性中
- TOK_INSERT => 循环递归子节点
- TOK_DESTINATION => 将输出目标的语法部分保存在QBParseInfo对象的nameToDest属性中
- TOK_SELECT => 分别将查询表达式的语法部分保存在destToSelExpr、destToAggregationExprs、destToDistinctFuncExprs三个属性中
- TOK_WHERE => 将Where部分的语法保存在QBParseInfo对象的destToWhereExpr属性中

3. 逻辑操作符Operator

    // 2. Gen OP Tree from resolved Parse Tree
    Operator sinkOp = genOPTree(ast, plannerCtx);

调用->ASTNode newAST = getOptimizedAST(); 又进入->RelNode optimizedOptiqPlan = logicalPlan();又进入calcitePlanner的apply方法

在apply方法中执行了:

//创建RelOptPlanner
RelOptPlanner planner = createPlanner(conf, corrScalarRexSQWithAgg, scalarAggNoGbyNoWin);
//将ASTNode转为calcite框架的关系代数RelNode
RelNode calciteGenPlan = genLogicalPlan(getQB(), true, null, null);
RelNode op = genTableLogicalPlan(tableAlias, qb);->tableRel = new HiveTableScan
srcRel = genJoinLogicalPlan -> RelNode filterRel = new HiveFilter//等执行calcite的关系代数优化计划
calciteGenPlan = hepPlan/calcitePreCboPlan = applyPreJoinOrderingTransforms
// apply 方法结束

//生成逻辑操作符Operator
sinkOp = genPlan(getQB());
     -> Operator op = genTablePlan

遍历上一个过程中生成的QB和QBParseInfo对象的保存语法的属性,包含如下几个步骤:

  • QB#aliasToSubq => 有子查询,递归调用
  • QB#aliasToTabs => TableScanOperator
  • QBParseInfo#joinExpr => QBJoinTree => ReduceSinkOperator + JoinOperator
  • QBParseInfo#destToWhereExpr => FilterOperator
  • QBParseInfo#destToGroupby => ReduceSinkOperator + GroupByOperator
  • QBParseInfo#destToOrderby => ReduceSinkOperator + ExtractOperator

4. 逻辑执行计划优化

大部分逻辑层优化器通过变换OperatorTree,合并操作符,达到减少MapReduce Job,减少shuffle数据量的目的。

// Perform Logical optimization
Optimizer optm = new Optimizer();
optm.setPctx(pCtx);
optm.initialize(conf);
pCtx = optm.optimize();

5. 物理执行计划

TaskCompiler is a the base class for classes that compile operator pipelines into tasks. OperatorTree转化为MapReduce Job的过程分为下面几个阶段:

  • 对输出表生成MoveTask
  • 从OperatorTree的其中一个根节点向下深度优先遍历
  • ReduceSinkOperator标示Map/Reduce的界限,多个Job间的界限
  • 遍历其他根节点,遇过碰到JoinOperator合并MapReduceTask
  • 生成StatTask更新元数据
  • 剪断Map与Reduce间的Operator的关系
// 9. Optimize Physical op tree & Translate to target execution engine (MR, TEZ..)
if (!ctx.getExplainLogical()) {
  TaskCompiler compiler = TaskCompilerFactory.getCompiler(conf, pCtx);
  compiler.init(queryState, console, db);
  compiler.compile(pCtx, rootTasks, inputs, outputs); //调用MapReduceCompiler的generateTaskTree(rootTasks, pCtx, mvTask, inputs, outputs);方法生成物理执行计划
  fetchTask = pCtx.getFetchTask();
}

6. 物理执行计划优化

optimizeTaskPlan(rootTasks, pCtx, ctx);
//MapReduceCompiler实现
PhysicalContext physicalContext = new PhysicalContext(conf,
        getParseContext(pCtx, rootTasks), ctx, rootTasks, pCtx.getFetchTask());
PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer(
    physicalContext, conf);
physicalOptimizer.optimize();
  • 执行hive.semantic.analyzer.hook的postAnalyze方法
  • 执行hive.query.lifetime.hooks的afterCompile方法

Executor

  • 执行hive.exec.driver.run.hooks的preDriverRun方法
  • 执行hive.exec.pre.hooks的run方法
  • 执行hive.query.lifetime.hooks的bbeforeExecution方法
  • 获取mr/spark/tez作业job数
  int mrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
  int jobs = mrJobs + Utilities.getTezTasks(plan.getRootTasks()).size()
      + Utilities.getSparkTasks(plan.getRootTasks()).size();
  if (jobs > 0) {
    logMrWarning(mrJobs);
    console.printInfo("Query ID = " + queryId);
    console.printInfo("Total jobs = " + jobs);
  }
  • 创建DriverContext,遍历queryPlan的rootTasks,执行executeTask方法
Task<? extends Serializable> task;
while ((task = driverCxt.getRunnable(maxthreads)) != null) {
  TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
  if (!runner.isRunning()) {
    break;
  }
}
  • 比如DDLTask会执行showDatabases方法,会执行Hive对象的getAllDatabases方法,调用metastoreClient rpc远程请求database数据
  private int showDatabases(Hive db, ShowDatabasesDesc showDatabasesDesc) throws HiveException {
    // get the databases for the desired pattern - populate the output stream
    List<String> databases = null;
    if (showDatabasesDesc.getPattern() != null) {
      LOG.info("pattern: {}", showDatabasesDesc.getPattern());
      databases = db.getDatabasesByPattern(showDatabasesDesc.getPattern());
    } else {
      databases = db.getAllDatabases();
    }
    DataOutputStream outStream = getOutputStream(showDatabasesDesc.getResFile());
    try {
      // 将结果输出到文件里
      // [root@hadoop3 root]# cat /tmp/root/2926e14c-6674-4b95-90b4-9f91ca67eb6a/hive_2020-10-22_16-36-33_861_644734059292418218-1/-local-10000
      // default
      formatter.showDatabases(outStream, databases);
    } catch (Exception e) {
      throw new HiveException(e, ErrorMsg.GENERIC_ERROR, "show databases");
    } finally {
      IOUtils.closeStream(outStream);
    }
    return 0;
  • 执行hive.exec.post.hooks的run方法
  • 执行hive.query.lifetime.hooks的afterExecution方法
  • 执行hive.exec.driver.run.hooks的postDriverRun方法
输出结果

打印文件输出,如果是fetchTask类型,调用fetchTask的fetch方法,会从上文输出文件中按照serde读出

//打印文件表头
printHeader(qp, out);
// print the results
int counter = 0;
try {
  if (out instanceof FetchConverter) {
    ((FetchConverter) out).fetchStarted();
  }
  // 获取结果
  while (qp.getResults(res)) {
    for (String r : res) {
          if (escapeCRLF) {
            r = EscapeCRLFHelper.escapeCRLF(r);
          }
      out.println(r);
    }
    counter += res.size();
    res.clear();
    if (out.checkError()) {
      break;
    }
  }
} catch (IOException e) {
  console.printError("Failed with exception " + e.getClass().getName() + ":" + e.getMessage(),
      "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
  ret = 1;
}
// 关闭查询,清楚关联资源
qp.close();

参考

Developer Guide Hive源码阅读 Hive工作流程源码分析 Hive原理与源码分析