数据库中间件 MyCAT源码分析:【单库单表】插入【推荐阅读】
???關注微信公眾號:【芋艿的后端小屋】有福利:
- 1. 概述
- 2. 接收請求,解析 SQL
- 3. 獲得路由結果
- 4. 獲得 MySQL 連接,執行 SQL
- 5. 響應執行 SQL 結果
1. 概述
內容形態以 順序圖 + 核心代碼 為主。
如果有地方表述不錯誤或者不清晰,歡迎留言。
對于內容形態,非常糾結,如果有建議,特別特別特別歡迎您提出。
微信號:wangwenbin-server。
本文講解 【單庫單表】插入 所涉及到的代碼。交互如下圖:
單庫單表插入簡圖整個過程,MyCAT Server 流程如下:
我們逐個步驟分析,一起來看看源碼。
2. 接收請求,解析 SQL
【單庫單表】插入(01主流程)【 1 - 2 】
接收一條 MySQL 命令。在【1】之前,還有請求數據讀取、拆成單條 MySQL SQL。
【 3 】
不同 MySQL 命令,分發到不同的方法執行。核心代碼如下:
1: // ??????【FrontendCommandHandler.java】2: public class FrontendCommandHandler implements NIOHandler {3: 4: 5: public void handle(byte[] data) {6: 7: // .... 省略部分代碼8: switch (data[4]) // 9: {10: case MySQLPacket.COM_INIT_DB:11: commands.doInitDB();12: source.initDB(data);13: break;14: case MySQLPacket.COM_QUERY: // 查詢命令15: // 計數查詢命令16: commands.doQuery();17: // 執行查詢命令18: source.query(data);19: break;20: case MySQLPacket.COM_PING:21: commands.doPing();22: source.ping();23: break;24: // .... 省略部分case25: }26: }27: 28: }復制代碼INSERT/SELECT/UPDATE/DELETE 等 SQL 歸屬于 MySQLPacket.COM_QUERY,詳細可見:《MySQL協議分析#4.2 客戶端命令請求報文(客戶端 -> 服務器)》。
##【 4 】
將 二進制數組 解析成 SQL。核心代碼如下:
1: // ??????【FrontendConnection.java】2: public void query(byte[] data) {3: // 取得語句4: String sql = null; 5: try {6: MySQLMessage mm = new MySQLMessage(data);7: mm.position(5);8: sql = mm.readString(charset);9: } catch (UnsupportedEncodingException e) {10: writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'");11: return;12: } 13: // 執行語句14: this.query( sql );15: }復制代碼##【 5 】
解析 SQL 類型。核心代碼如下:
1: // ??????【ServerQueryHandler.java】2: 3: public void query(String sql) {4: // 解析 SQL 類型5: int rs = ServerParse.parse(sql);6: int sqlType = rs & 0xff;7: 8: switch (sqlType) {9: //explain sql10: case ServerParse.EXPLAIN:11: ExplainHandler.handle(sql, c, rs >>> 8);12: break;13: // .... 省略部分case14: break;15: case ServerParse.SELECT:16: SelectHandler.handle(sql, c, rs >>> 8);17: break;18: // .... 省略部分case19: default:20: if(readOnly){21: LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString());22: c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly");23: break;24: }25: c.execute(sql, rs & 0xff);26: }27: }28: 29:30: // ??????【ServerParse.java】31: public static int parse(String stmt) {32: int length = stmt.length();33: //FIX BUG FOR SQL SUCH AS /XXXX/SQL34: int rt = -1;35: for (int i = 0; i < length; ++i) {36: switch (stmt.charAt(i)) {37: // .... 省略部分case case 'I':38: case 'i':39: rt = insertCheck(stmt, i);40: if (rt != OTHER) {41: return rt;42: }43: continue;44: // .... 省略部分case45: case 'S':46: case 's':47: rt = sCheck(stmt, i);48: if (rt != OTHER) {49: return rt;50: }51: continue;52: // .... 省略部分case53: default:54: continue;55: }56: }57: return OTHER;58: }復制代碼##【 6 】
執行 SQL,詳細解析見下文,核心代碼如下:
1: // ??????【ServerConnection.java】2: public class ServerConnection extends FrontendConnection {3: public void execute(String sql, int type) {4: // .... 省略代碼5: SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db);6: if (schema == null) {7: writeErrMessage(ErrorCode.ERR_BAD_LOGICDB,8: "Unknown MyCAT Database '" + db + "'");9: return;10: }11: 12: // .... 省略代碼13: 14: // 路由到后端數據庫,執行 SQL15: routeEndExecuteSQL(sql, type, schema);16: }17: 18: public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) {19: // 路由計算20: RouteResultset rrs = null;21: try {22: rrs = MycatServer23: .getInstance()24: .getRouterservice()25: .route(MycatServer.getInstance().getConfig().getSystem(),26: schema, type, sql, this.charset, this);27: 28: } catch (Exception e) {29: StringBuilder s = new StringBuilder();30: LOGGER.warn(s.append(this).append(sql).toString() + " err:" + e.toString(),e);31: String msg = e.getMessage();32: writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg);33: return;34: }35: 36: // 執行 SQL37: if (rrs != null) {38: // session執行39: session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type);40: }41: 42: }43: 44: }復制代碼3. 獲得路由結果
【單庫單表】插入(02獲取路由)【 1 - 2 】【 12 】
獲得路由主流程。核心代碼如下:
1: // ??????【RouteService.java】2: public RouteResultset route(SystemConfig sysconf, SchemaConfig schema,3: int sqlType, String stmt, String charset, ServerConnection sc)4: throws SQLNonTransientException {5: RouteResultset rrs = null;6: // .... 省略代碼7: int hintLength = RouteService.isHintSql(stmt);8: if(hintLength != -1){ // TODO 待讀:hint9: // .... 省略代碼10: }11: } else {12: stmt = stmt.trim();13: rrs = RouteStrategyFactory.getRouteStrategy().route(sysconf, schema, sqlType, stmt,14: charset, sc, tableId2DataNodeCache);15: }16: 17: // .... 省略代碼 return rrs;18: }19: // ??????【AbstractRouteStrategy.java】20: 21: public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,22: String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException {23: 24: // .... 省略代碼25: 26: // 處理一些路由之前的邏輯;全局序列號,父子表插入27: if (beforeRouteProcess(schema, sqlType, origSQL, sc) ) {28: return null;29: }30: 31: // .... 省略代碼32: 33: // 檢查是否有分片34: if (schema.isNoSharding() && ServerParse.SHOW != sqlType) {35: rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);36: } else {37: RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);38: if (returnedSet == null) {39: rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc);40: }41: }42: 43: return rrs;44: }復制代碼路由 詳細解析,我們另開文章,避免內容過多,影響大家對【插入】流程和邏輯的理解。
【 3 - 6 】
路由前置處理。當符合如下三種情況下,進行處理:
{ 1 } 使用全局序列號:
insert into table (id, name) values (NEXT VALUE FOR MYCATSEQ_ID, 'name')復制代碼{ 2 } ER 子表插入
{ 3 } 主鍵使用自增 ID 插入:
情況 { 1 } { 3 } 情況類似,使用全局序列號。
核心代碼如下:
1: // ??????【AbstractRouteStrategy.java】2: private boolean beforeRouteProcess(SchemaConfig schema, int sqlType, String origSQL, ServerConnection sc)3: throws SQLNonTransientException {4: return // 處理 id 使用 全局序列號5: RouterUtil.processWithMycatSeq(schema, sqlType, origSQL, sc)6: // 處理 ER 子表7: || (sqlType == ServerParse.INSERT && RouterUtil.processERChildTable(schema, origSQL, sc))8: // 處理 id 自增長9: || (sqlType == ServerParse.INSERT && RouterUtil.processInsert(schema, sqlType, origSQL, sc));10: }復制代碼RouterUtil.java 處理 SQL 考慮性能,實現會比較 C-style,代碼咱就不貼了,傳送門:github.com/YunaiV/Myca… (?該倉庫從官方 Fork,逐步完善中文注釋,歡迎 Star)
【 7 - 11 】
當前置路由處理全局序列號時,添加到全局序列處理器(MyCATSequnceProcessor)。該處理器會異步生成 ID,替換 SQL 內的 NEXT VALUE FOR MYCATSEQ_ 正則。例如:
insert into table (id, name) values (NEXT VALUE FOR MYCATSEQ_ID, 'name') ===> insert into table (id, name) values (868348974560579584, 'name')復制代碼異步處理完后,調用 ServerConnection#routeEndExecuteSQL(sql, type, schema) 方法重新執行 SQL。
核心代碼如下:
1: // ??????【RouterUtil.java】2: public static void processSQL(ServerConnection sc,SchemaConfig schema,String sql,int sqlType){3: SessionSQLPair sessionSQLPair = new SessionSQLPair(sc.getSession2(), schema, sql, sqlType);4: MycatServer.getInstance().getSequnceProcessor().addNewSql(sessionSQLPair);5: }6: // ??????【MyCATSequnceProcessor.java】7: public class MyCATSequnceProcessor {8: private LinkedBlockingQueue<SessionSQLPair> seqSQLQueue = new LinkedBlockingQueue<SessionSQLPair>();9: private volatile boolean running=true;10: 11: public void addNewSql(SessionSQLPair pair) {12: seqSQLQueue.add(pair);13: }14: 15: private void executeSeq(SessionSQLPair pair) {16: try {17: 18: // 使用Druid解析器實現sequence處理 @兵臨城下19: DruidSequenceHandler sequenceHandler = new DruidSequenceHandler(MycatServer20: .getInstance().getConfig().getSystem().getSequnceHandlerType());21: 22: // 生成可執行 SQL :目前主要是生成 id23: String charset = pair.session.getSource().getCharset();24: String executeSql = sequenceHandler.getExecuteSql(pair.sql,charset == null ? "utf-8":charset);25: 26: // 執行 SQL27: pair.session.getSource().routeEndExecuteSQL(executeSql, pair.type,pair.schema);28: } catch (Exception e) {29: LOGGER.error("MyCATSequenceProcessor.executeSeq(SesionSQLPair)",e);30: pair.session.getSource().writeErrMessage(ErrorCode.ER_YES,"mycat sequnce err." + e);31: return;32: }33: }34: 35: class ExecuteThread extends Thread {36: 37: public ExecuteThread() {38: setDaemon(true); // 設置為后臺線程,防止throw RuntimeExecption進程仍然存在的問題39: }40: 41: public void run() {42: while (running) {43: try {44: SessionSQLPair pair=seqSQLQueue.poll(100,TimeUnit.MILLISECONDS);45: if(pair!=null){46: executeSeq(pair);47: }48: } catch (Exception e) {49: LOGGER.warn("MyCATSequenceProcessor$ExecutorThread",e);50: }51: }52: }53: }54: }復制代碼?此處有個疑問:MyCATSequnceProcessor 是單線程,會不會插入性能有一定的影響?后續咱做下性能測試。
4. 獲得 MySQL 連接,執行 SQL
【單庫單表】插入(03執行 SQL)【 1 - 8 】
獲得 MySQL 連接。
- PhysicalDBNode :物理數據庫節點。
- PhysicalDatasource :物理數據庫數據源。
【 9 - 13 】
發送 SQL 到 MySQL Server,執行 SQL。
5. 響應執行 SQL 結果
【單庫單表】插入(04執行響應)【 1 - 4 】
處理 MySQL Server 響應數據包。
【 5 - 8 】
發送插入成功結果給 MySQL Client。
總結
以上是生活随笔為你收集整理的数据库中间件 MyCAT源码分析:【单库单表】插入【推荐阅读】的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 以高效节能为使命 绿色数据中心势在必行
- 下一篇: 又一轮电邮中间人攻击来袭 企业如何自保?