数据库路由中间件MyCat - 源代码篇(7)
此文已由作者張鎬薪授權網易云社區發布。
歡迎訪問網易云社區,了解更多網易技術產品運營經驗。
3. 連接模塊
3.4 FrontendConnection前端連接
構造方法:
public?FrontendConnection(NetworkChannel?channel)?throws?IOException?{?????super(channel);InetSocketAddress?localAddr?=?(InetSocketAddress)?channel.getLocalAddress();InetSocketAddress?remoteAddr?=?null;?????if?(channel?instanceof?SocketChannel)?{remoteAddr?=?(InetSocketAddress)?((SocketChannel)?channel).getRemoteAddress();????}?else?if?(channel?instanceof?AsynchronousSocketChannel)?{remoteAddr?=?(InetSocketAddress)?((AsynchronousSocketChannel)?channel).getRemoteAddress();}?????this.host?=?remoteAddr.getHostString();?????this.port?=?localAddr.getPort();?????this.localPort?=?remoteAddr.getPort();?????this.handler?=?new?FrontendAuthenticator(this);}FrontendConnection是對前端連接channel的封裝,接受NetworkChannel作為參數構造。前端連接建立,需要先驗證其權限,所以,handler首先設置為FrontendAuthenticator 等到驗證成功,handler會被設置成FrontendCommandHandler。 下面來看和FrontendConnection相關的Handler: ?FrontendCommandHandler會先解析請求類型,之后調用不同的方法處理不同類型的請求。例如,FrontendQueryHandler會解析query類型的sql請求語句:
?@Overridepublic?void?handle(byte[]?data){????????if(source.getLoadDataInfileHandler()!=null&&source.getLoadDataInfileHandler().isStartLoadData()){MySQLMessage?mm?=?new?MySQLMessage(data);????????????int??packetLength?=?mm.readUB3();????????????if(packetLength+4==data.length){source.loadDataInfileData(data);}????????????return;}????????switch?(data[4]){????????????case?MySQLPacket.COM_INIT_DB:commands.doInitDB();source.initDB(data);????????????????break;????????????case?MySQLPacket.COM_QUERY:commands.doQuery();source.query(data);????????????????break;????????????case?MySQLPacket.COM_PING:commands.doPing();source.ping();????????????????break;????????????case?MySQLPacket.COM_QUIT:commands.doQuit();source.close("quit?cmd");????????????????break;????????????case?MySQLPacket.COM_PROCESS_KILL:commands.doKill();source.kill(data);????????????????break;????????????case?MySQLPacket.COM_STMT_PREPARE:commands.doStmtPrepare();source.stmtPrepare(data);????????????????break;????????????case?MySQLPacket.COM_STMT_EXECUTE:commands.doStmtExecute();source.stmtExecute(data);????????????????break;????????????case?MySQLPacket.COM_STMT_CLOSE:commands.doStmtClose();source.stmtClose(data);????????????????break;????????????case?MySQLPacket.COM_HEARTBEAT:commands.doHeartbeat();source.heartbeat(data);????????????????break;????????????default:commands.doOther();source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR,?????????????????????????????"Unknown?command");}}FrontendCommandHandler會調用FrontendConnection合適的方法解析處理不同的請求,例如它的initDB(byte[] data)方法:
????public?void?initDB(byte[]?data)?{MySQLMessage?mm?=?new?MySQLMessage(data);mm.position(5);String?db?=?mm.readString();????????//?檢查schema的有效性if?(db?==?null?||?!privileges.schemaExists(db))?{writeErrMessage(ErrorCode.ER_BAD_DB_ERROR,?"Unknown?database?'"?+?db?+?"'");????????????return;}????????if?(!privileges.userExists(user,?host))?{writeErrMessage(ErrorCode.ER_ACCESS_DENIED_ERROR,?"Access?denied?for?user?'"?+?user?+?"'");????????????return;}Set<String>?schemas?=?privileges.getUserSchemas(user);????????if?(schemas?==?null?||?schemas.size()?==?0?||?schemas.contains(db))?{????????????this.schema?=?db;write(writeToBuffer(OkPacket.OK,?allocate()));}?else?{String?s?=?"Access?denied?for?user?'"?+?user?+?"'?to?database?'"?+?db?+?"'";writeErrMessage(ErrorCode.ER_DBACCESS_DENIED_ERROR,?s);}}方法調用: 通過查看可以發現,在command packet被解析出是initDB類型的請求時(其實就是用戶發送的查詢語句為“use XXX”),會調用此方法進行處理,同時,這些方法都是被RW線程執行的。 此方法從FrontedPrivilege中驗證用戶是否有權限訪問這個邏輯庫,如果有就把當前連接的邏輯庫設為用戶請求的邏輯庫。 其他方法與handler也是相似的關系,可以看出,FrontendConnection組合了多種封裝的handler來處理不同的請求的不同階段。至于各種handler,會在之后sql解析,sql路由,協議實現等模塊詳細介紹。
3.4.1 ServerConnection服務端連接
前端連接包括ServerConnection(服務端連接)和ManagerConnection(管理端連接)。前端鏈接不會直接創建,而是通過工廠創建: 工廠方法:
@Overrideprotected?FrontendConnection?getConnection(NetworkChannel?channel)?throws?IOException?{SystemConfig?sys?=?MycatServer.getInstance().getConfig().getSystem();ServerConnection?c?=?new?ServerConnection(channel);MycatServer.getInstance().getConfig().setSocketParams(c,?true);c.setPrivileges(MycatPrivileges.instance());c.setQueryHandler(new?ServerQueryHandler(c));c.setLoadDataInfileHandler(new?ServerLoadDataInfileHandler(c));????????//?c.setPrepareHandler(new?ServerPrepareHandler(c));c.setTxIsolation(sys.getTxIsolation());c.setSession2(new?NonBlockingSession(c));????????return?c;}可以看出,每個新的ServerConnection都會綁定一個新的ServerQueryHandler負責處理sql指令,一個ServerLoadDataInfileHandler負責處理文件載入命令,一個session負責處理事務 下面是相關的類圖 ?這里的所有獨立的handler里面都是static方法,可供其他類直接調用。每個ServerConnection都會有一個NonBlockingSession來處理。 這里說下連接、會話、邏輯庫、MyCat實例的關系(與MySQL里面的連接、會話、數據庫、MySQL實例的關系不太一樣);首先每個MyCat實例都管理多個數據庫。連接是針對MyCat實例建立的,并且,MyCat的連接(AbstractConnection)是不可復用的,在close方法會關閉連接并清理使用的資源。但是緩存資源(buffer)是可以復用的。比如,在一個前端連接長時間空閑時或者出現異常時,會被清理掉。每個連接會擁有一個session來處理事務,保存會話信息。 這里,每個連接擁有一個會話。每個連接中的方法,被RW線程執行,相當于與RW線程綁定。RW線程是可以復用的,這里相當于MySQL中的連接是可以復用的(連接池)。 Session.java:
public?interface?Session?{????/***?取得源端連接*/FrontendConnection?getSource();????/***?取得當前目標端數量*/int?getTargetCount();????/***?開啟一個會話執行*/void?execute(RouteResultset?rrs,?int?type);????/***?提交一個會話執行*/void?commit();????/***?回滾一個會話執行*/void?rollback();????/***?取消一個正在執行中的會話*?*?@param?sponsor*????????????如果發起者為null,則表示由自己發起。*/void?cancel(FrontendConnection?sponsor);????/***?終止會話,必須在關閉源端連接后執行該方法。*/void?terminate();}下面我們著重研究它的實現類NonBlockingSession: 首先,取得源端連接方法FrontendConnection getSource();,其實就是NonBlockingSession在創建時就已綁定一個連接,誰會調用這個方法取得源端鏈接呢? ?可以發現,主要有各種查詢的handler還有SQLengine會去調用。因為處理無論返回什么結果,都需要返回給源端。 int getTargetCount();取得當前目標端數量。根據目標端的數量不同會用不同的handler處理轉發SQL和合并結果。
@Overridepublic?void?execute(RouteResultset?rrs,?int?type)?{????????//?清理之前處理用的資源clearHandlesResources();????????if?(LOGGER.isDebugEnabled())?{StringBuilder?s?=?new?StringBuilder();LOGGER.debug(s.append(source).append(rrs).toString()?+?"?rrs?");}????????//?檢查路由結果是否為空RouteResultsetNode[]?nodes?=?rrs.getNodes();????????if?(nodes?==?null?||?nodes.length?==?0?||?nodes[0].getName()?==?null||?nodes[0].getName().equals(""))?{????????????//如果為空,則表名有誤,提示客戶端source.writeErrMessage(ErrorCode.ER_NO_DB_ERROR,????????????????????"No?dataNode?found?,please?check?tables?defined?in?schema:"+?source.getSchema());????????????return;}????????//如果路由結果個數為1,則為單點查詢或事務if?(nodes.length?==?1)?{????????????//使用SingleNodeHandler處理單點查詢或事務singleNodeHandler?=?new?SingleNodeHandler(rrs,?this);????????????try?{singleNodeHandler.execute();}?catch?(Exception?e)?{LOGGER.warn(new?StringBuilder().append(source).append(rrs),?e);source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA,?e.toString());}}?else?{????????????//如果路由結果>1,則為多點查詢或事務boolean?autocommit?=?source.isAutocommit();SystemConfig?sysConfig?=?MycatServer.getInstance().getConfig().getSystem();????????????//mutiNodeLimitType沒有用。。。int?mutiNodeLimitType?=?sysConfig.getMutiNodeLimitType();????????????//使用multiNodeHandler處理多點查詢或事務multiNodeHandler?=?new?MultiNodeQueryHandler(type,?rrs,?autocommit,????????????????????this);????????????try?{multiNodeHandler.execute();}?catch?(Exception?e)?{LOGGER.warn(new?StringBuilder().append(source).append(rrs),?e);source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA,?e.toString());}}}每次一個Session執行SQL時,會先清理handler使用的資源。SingleNodeHandler與multiNodeHandler之后會講。這里的handler我們之后會在每個模塊去講,Session之后也還會提到,敬請期待
免費體驗云安全(易盾)內容安全、驗證碼等服務
更多網易技術、產品、運營經驗分享請點擊。
相關文章:
【推薦】?Android TV 開發(5)
【推薦】?分布式存儲系統可靠性系列三:設計模式
轉載于:https://www.cnblogs.com/zyfd/p/9894735.html
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的数据库路由中间件MyCat - 源代码篇(7)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 洛谷 P3332 [ZJOI2013]K
- 下一篇: 父控件点击不上