数据库中间件 MyCAT 源码分析 —— 【单库单表】插入

1,737 阅读7分钟
原文链接: www.iocoder.cn
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/MyCAT/single-db-single-table-insert/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 MyCAT 1.6.5 正式版



1. 概述

内容形态以 顺序图 + 核心代码 为主。
如果有地方表述不错误或者不清晰,欢迎留言。
对于内容形态,非常纠结,如果有建议,特别特别特别欢迎您提出。

本文讲解 【单库单表】插入 所涉及到的代码。交互如下图:

单库单表插入简图

整个过程,MyCAT Server 流程如下:

  1. 接收 MySQL Client 请求,解析 SQL。
  2. 获得路由结果,进行路由。
  3. 获得 MySQL 连接,执行 SQL。
  4. 响应执行结果,发送结果给 MySQL Client。

我们逐个步骤分析,一起来看看源码。

2. 接收请求,解析 SQL

【单库单表】插入(01主流程)

【 1 - 2 】

接收一条 MySQL 命令。在【1】之前,还有请求数据读取、拆成单条 MySQL SQL。

【 3 】

不同 MySQL 命令,分发到不同的方法执行。核心代码如下:

 1: // ⬇️⬇️⬇️【FrontendCommandHandler.java】
 2: public class FrontendCommandHandler implements NIOHandler {
 3: 
 4:     @Override
 5:     public void handle(byte[] data) {
 6:     
 7:         // .... 省略部分代码
 8:         switch (data[4]) // 
 9:         {
10:             case MySQLPacket.COM_INIT_DB:
11:                 commands.doInitDB();
12:                 source.initDB(data);
13:                 break;
14:             case MySQLPacket.COM_QUERY: // 查询命令
15:                 // 计数查询命令
16:                 commands.doQuery();
17:                 // 执行查询命令
18:                 source.query(data);
19:                 break;
20:             case MySQLPacket.COM_PING:
21:                 commands.doPing();
22:                 source.ping();
23:                 break;
24:             // .... 省略部分case
25:         }
26:     }
27: 
28: }

INSERT/SELECT/UPDATE/DELETE 等 SQL 归属于 MySQLPacket.COM_QUERY,详细可见:《MySQL协议分析#4.2 客户端命令请求报文(客户端 -> 服务器)》

##【 4 】

将 二进制数组 解析成 SQL。核心代码如下:

 1: // ⬇️⬇️⬇️【FrontendConnection.java】
 2: public void query(byte[] data) {
 3: 	// 取得语句
 4: 	String sql = null;		
 5: 	try {
 6: 		MySQLMessage mm = new MySQLMessage(data);
 7: 		mm.position(5);
 8: 		sql = mm.readString(charset);
 9: 	} catch (UnsupportedEncodingException e) {
10: 		writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'");
11: 		return;
12: 	}		
13: 	// 执行语句
14: 	this.query( sql );
15: }

##【 5 】

解析 SQL 类型。核心代码如下:

 1: // ⬇️⬇️⬇️【ServerQueryHandler.java】
 2: @Override
 3: public void query(String sql) {
 4: 	// 解析 SQL 类型
 5: 	int rs = ServerParse.parse(sql);
 6: 	int sqlType = rs & 0xff;
 7: 	
 8: 	switch (sqlType) {
 9: 	//explain sql
10: 	case ServerParse.EXPLAIN:
11: 		ExplainHandler.handle(sql, c, rs >>> 8);
12: 		break;
13: 	// .... 省略部分case
14: 		break;
15: 	case ServerParse.SELECT:
16: 		SelectHandler.handle(sql, c, rs >>> 8);
17: 		break;
18: 	// .... 省略部分case
19: 	default:
20: 		if(readOnly){
21: 			LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString());
22: 			c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly");
23: 			break;
24: 		}
25: 		c.execute(sql, rs & 0xff);
26: 	}
27: }
28: 
29:
30: // ⬇️⬇️⬇️【ServerParse.java】
31: public static int parse(String stmt) {
32: 	int length = stmt.length();
33: 	//FIX BUG FOR SQL SUCH AS /XXXX/SQL
34: 	int rt = -1;
35: 	for (int i = 0; i < length; ++i) {
36: 		switch (stmt.charAt(i)) {
37: 		// .... 省略部分case			case 'I':
38: 		case 'i':
39: 			rt = insertCheck(stmt, i);
40: 			if (rt != OTHER) {
41: 				return rt;
42: 			}
43: 			continue;
44: 			// .... 省略部分case
45: 		case 'S':
46: 		case 's':
47: 			rt = sCheck(stmt, i);
48: 			if (rt != OTHER) {
49: 				return rt;
50: 			}
51: 			continue;
52: 			// .... 省略部分case
53: 		default:
54: 			continue;
55: 		}
56: 	}
57: 	return OTHER;
58: }

##【 6 】

执行 SQL,详细解析见下文,核心代码如下:

 1: // ⬇️⬇️⬇️【ServerConnection.java】
 2: public class ServerConnection extends FrontendConnection {
 3: 	public void execute(String sql, int type) {
 4: 		// .... 省略代码
 5: 		SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db);
 6: 		if (schema == null) {
 7: 			writeErrMessage(ErrorCode.ERR_BAD_LOGICDB,
 8: 					"Unknown MyCAT Database '" + db + "'");
 9: 			return;
10: 		}
11: 
12: 		// .... 省略代码
13: 
14: 		// 路由到后端数据库,执行 SQL
15: 		routeEndExecuteSQL(sql, type, schema);
16: 	}
17: 	
18:     public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) {
19: 		// 路由计算
20: 		RouteResultset rrs = null;
21: 		try {
22: 			rrs = MycatServer
23: 					.getInstance()
24: 					.getRouterservice()
25: 					.route(MycatServer.getInstance().getConfig().getSystem(),
26: 							schema, type, sql, this.charset, this);
27: 
28: 		} catch (Exception e) {
29: 			StringBuilder s = new StringBuilder();
30: 			LOGGER.warn(s.append(this).append(sql).toString() + " err:" + e.toString(),e);
31: 			String msg = e.getMessage();
32: 			writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg);
33: 			return;
34: 		}
35: 
36: 		// 执行 SQL
37: 		if (rrs != null) {
38: 			// session执行
39: 			session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type);
40: 		}
41: 		
42:  	}
43: 
44: }

3. 获得路由结果

【单库单表】插入(02获取路由)

【 1 - 2 】【 12 】

获得路由主流程。核心代码如下:

 1: // ⬇️⬇️⬇️【RouteService.java】
 2: public RouteResultset route(SystemConfig sysconf, SchemaConfig schema,
 3: 		int sqlType, String stmt, String charset, ServerConnection sc)
 4: 		throws SQLNonTransientException {
 5: 	RouteResultset rrs = null;
 6: 	// .... 省略代码
 7: 	int hintLength = RouteService.isHintSql(stmt);
 8: 	if(hintLength != -1){ // TODO 待读:hint
 9: 		// .... 省略代码
10: 		}
11: 	} else {
12: 		stmt = stmt.trim();
13: 		rrs = RouteStrategyFactory.getRouteStrategy().route(sysconf, schema, sqlType, stmt,
14: 				charset, sc, tableId2DataNodeCache);
15: 	}
16: 
17: 	// .... 省略代码		return rrs;
18: }
19: // ⬇️⬇️⬇️【AbstractRouteStrategy.java】
20: @Override
21: public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,
22: 		String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException {
23: 
24: 	// .... 省略代码
25: 
26: 	// 处理一些路由之前的逻辑;全局序列号,父子表插入
27: 	if (beforeRouteProcess(schema, sqlType, origSQL, sc) ) {
28: 		return null;
29: 	}
30: 
31: 	// .... 省略代码
32: 
33: 	// 检查是否有分片
34: 	if (schema.isNoSharding() && ServerParse.SHOW != sqlType) {
35: 		rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);
36: 	} else {
37: 		RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);
38: 		if (returnedSet == null) {
39: 			rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc);
40: 		}
41: 	}
42: 
43: 	return rrs;
44: }

路由 详细解析,我们另开文章,避免内容过多,影响大家对【插入】流程和逻辑的理解。

【 3 - 6 】

路由前置处理。当符合如下三种情况下,进行处理:

{ 1 } 使用全局序列号

insert into table (id, name) values (NEXT VALUE FOR MYCATSEQ_ID, 'name')

{ 2 } ER 子表插入
{ 3 } 主键使用自增 ID 插入:

insert into table (name) values ('name')
===>
insert into table (id, name) values (NEXT VALUE FOR MYCATSEQ_ID, 'name')
```  

情况 { 1 } { 3 } 情况类似,使用全局序列号。

核心代码如下:

```Java
  1: // ⬇️⬇️⬇️【AbstractRouteStrategy.java】
  2: private boolean beforeRouteProcess(SchemaConfig schema, int sqlType, String origSQL, ServerConnection sc)
  3: 		throws SQLNonTransientException {
  4: 	return  // 处理 id 使用 全局序列号
  5:             RouterUtil.processWithMycatSeq(schema, sqlType, origSQL, sc)
  6:             // 处理 ER 子表
  7: 			|| (sqlType == ServerParse.INSERT && RouterUtil.processERChildTable(schema, origSQL, sc))
  8:             // 处理 id 自增长
  9: 			|| (sqlType == ServerParse.INSERT && RouterUtil.processInsert(schema, sqlType, origSQL, sc));
 10: }

RouterUtil.java 处理 SQL 考虑性能,实现会比较 C-style,代码咱就不贴了,传送门:https://github.com/YunaiV/Mycat-Server/blob/1.6/src/main/java/io/mycat/route/util/RouterUtil.java。 (😈该仓库从官方 Fork,逐步完善中文注释,欢迎 Star)

【 7 - 11 】

前置路由处理全局序列号时,添加到全局序列处理器(MyCATSequnceProcessor)。该处理器会异步生成 ID,替换 SQL 内的 NEXT VALUE FOR MYCATSEQ_ 正则。例如:

insert into table (id, name) values (NEXT VALUE FOR MYCATSEQ_ID, 'name')
===>
insert into table (id, name) values (868348974560579584, 'name')

异步处理完后,调用 ServerConnection#routeEndExecuteSQL(sql, type, schema) 方法重新执行 SQL。

核心代码如下:

 1: // ⬇️⬇️⬇️【RouterUtil.java】
 2: public static void processSQL(ServerConnection sc,SchemaConfig schema,String sql,int sqlType){
 3: 	SessionSQLPair sessionSQLPair = new SessionSQLPair(sc.getSession2(), schema, sql, sqlType);
 4: 	MycatServer.getInstance().getSequnceProcessor().addNewSql(sessionSQLPair);
 5: }
 6: // ⬇️⬇️⬇️【MyCATSequnceProcessor.java】
 7: public class MyCATSequnceProcessor {
 8: 	private LinkedBlockingQueue<SessionSQLPair> seqSQLQueue = new LinkedBlockingQueue<SessionSQLPair>();
 9: 	private volatile boolean running=true;
10: 	
11: 	public void addNewSql(SessionSQLPair pair) {
12: 		seqSQLQueue.add(pair);
13: 	}
14: 
15: 	private void executeSeq(SessionSQLPair pair) {
16: 		try {
17: 			
18: 			// 使用Druid解析器实现sequence处理  @兵临城下
19: 			DruidSequenceHandler sequenceHandler = new DruidSequenceHandler(MycatServer
20: 					.getInstance().getConfig().getSystem().getSequnceHandlerType());
21: 
22: 			// 生成可执行 SQL :目前主要是生成 id
23: 			String charset = pair.session.getSource().getCharset();
24: 			String executeSql = sequenceHandler.getExecuteSql(pair.sql,charset == null ? "utf-8":charset);
25: 
26: 			// 执行 SQL
27: 			pair.session.getSource().routeEndExecuteSQL(executeSql, pair.type,pair.schema);
28: 		} catch (Exception e) {
29: 			LOGGER.error("MyCATSequenceProcessor.executeSeq(SesionSQLPair)",e);
30: 			pair.session.getSource().writeErrMessage(ErrorCode.ER_YES,"mycat sequnce err." + e);
31: 			return;
32: 		}
33: 	}
34: 	
35: 	class ExecuteThread extends Thread {
36: 		
37: 		public ExecuteThread() {
38: 			setDaemon(true); // 设置为后台线程,防止throw RuntimeExecption进程仍然存在的问题
39: 		}
40: 		
41: 		public void run() {
42: 			while (running) {
43: 				try {
44: 					SessionSQLPair pair=seqSQLQueue.poll(100,TimeUnit.MILLISECONDS);
45: 					if(pair!=null){
46:                         executeSeq(pair);
47: 					}
48: 				} catch (Exception e) {
49: 					LOGGER.warn("MyCATSequenceProcessor$ExecutorThread",e);
50: 				}
51: 			}
52: 		}
53: 	}
54: }

❓此处有个疑问:MyCATSequnceProcessor 是单线程,会不会插入性能有一定的影响?后续咱做下性能测试。

4. 获得 MySQL 连接,执行 SQL

【单库单表】插入(03执行 SQL)

【 1 - 8 】

获得 MySQL 连接。

  • PhysicalDBNode :物理数据库节点。
  • PhysicalDatasource :物理数据库数据源。

【 9 - 13 】

发送 SQL 到 MySQL Server,执行 SQL。

5. 响应执行 SQL 结果

【单库单表】插入(04执行响应)

【 1 - 4 】

处理 MySQL Server 响应数据包。

【 5 - 8 】

发送插入成功结果给 MySQL Client。

666. 彩蛋

知识星球