Hadoop DFS源码研究之---Hadoop RPC机制
先記錄server端的機制
??最初接觸RPC,用自己的思路來猜測RPC的實現(xiàn)機制:
??Server端開啟socket監(jiān)聽,listen()à accept()àread()àwrite()àclose()
??有請求來時開啟thread處理請求,原進程繼續(xù)監(jiān)聽,請求完畢后將結(jié)果返回給client端
??這樣設(shè)計的缺點:
??當訪問量大時,并發(fā)開啟大量線程,會造成server端資源瓶頸。
??每個線程中,read()阻塞,直到有數(shù)據(jù)進來。?
Hadoop server端機制:使用JAVA NIO機制(new IO)?
?JAVA NIO常用的幾個類是 Listener,selector, reader, handler, responder
Hadoop中:
? ? ??acceptChannel = ServerSocketChannel.open();
? ? ? acceptChannel.configureBlocking(false);
? ? ? // Bind the server socket to the local host and port
? ? ? bind(acceptChannel.socket(), address, backlogLength);
綁定之后創(chuàng)建selector
selector= Selector.open();
以及reader pool ?
? ? ? readers = new Reader[readThreads];
? ? ? readPool = Executors.newFixedThreadPool(readThreads);
? ? ? for (int i = 0; i < readThreads; i++) {
? ? ? ? Selector readSelector = Selector.open();
? ? ? ? Reader reader = new Reader(readSelector);
? ? ? ? readers[i] = reader;
? ? ? ? readPool.execute(reader);
? ? ? }
每個reader有一個readselector
然后listener 的?selector開始監(jiān)聽:
? ? while (running) {
? ? ? ? SelectionKey key = null;
? ? ? ? try {
? ? ? ? ??selector.select();
? ? ? ? ? Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
? ? ? ? ? while (iter.hasNext()) {
? ? ? ? ? ? key = iter.next();
? ? ? ? ? ? iter.remove();
? ? ? ? ? ? try {
? ? ? ? ? ? ? if (key.isValid()) {
? ? ? ? ? ? ? ? if (key.isAcceptable())
? ? ? ? ? ? ? ? ??doAccept(key); ??
? ? ? ? ? ? ? }
? ? ? ? ? ? } catch (IOException e) {
? ? ? ? ? ? }
? ? ? ? ? ? key = null;
? ? ? ? ? }
? ? ? ? }?
accept之后
將對應(yīng)的selectionkey 轉(zhuǎn)給Reader
void doAccept(SelectionKey key) throws IOException, ?OutOfMemoryError {
? ? ? Connection c = null;
? ? ??ServerSocketChannel server = (ServerSocketChannel) key.channel();//獲取這個key對應(yīng)的socket句柄 或者說channel
? ? ? SocketChannel channel;
? ? ? while ((channel = server.accept()) != null) {
? ? ? ? channel.configureBlocking(false);
? ? ? ? channel.socket().setTcpNoDelay(tcpNoDelay);
? ? ? ? Reader reader = getReader();
? ? ? ? try {
? ? ? ? ? reader.startAdd();
? ? ? ? ? SelectionKey readKey = reader.registerChannel(channel);
? ? ? ? ? c = new Connection(readKey, channel, System.currentTimeMillis());
? ? ? ? ??readKey.attach(c);
? ? ? ? ? synchronized (connectionList) {
? ? ? ? ? ? connectionList.add(numConnections, c);
? ? ? ? ? ? numConnections++;
? ? ? ? ? }
? ? ? ? ? if (LOG.isDebugEnabled())
? ? ? ? ? ? LOG.debug("Server connection from " + c.toString() +
? ? ? ? ? ? ? ? "; # active connections: " + numConnections +
? ? ? ? ? ? ? ? "; # queued calls: " + callQueue.size()); ? ? ? ? ?
? ? ? ? } finally {
? ? ? ? ? reader.finishAdd();?
? ? ? ? }
? ? ? }
? ? }
將連接轉(zhuǎn)給reader
reader是一直在阻塞的
? ? public void run() {
? ? ? ? LOG.info("Starting SocketReader");
? ? ? ? synchronized (this) {
? ? ? ? ? while (running) {
? ? ? ? ? ? SelectionKey key = null;
? ? ? ? ? ? try {
? ? ? ? ? ? ??readSelector.select();
? ? ? ? ? ? ? while (adding) {
? ? ? ? ? ? ? ? this.wait(1000);
? ? ? ? ? ? ? } ? ? ? ? ? ? ?
? ? ? ? ? ? ? Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
? ? ? ? ? ? ? while (iter.hasNext()) {
? ? ? ? ? ? ? ? key = iter.next();
? ? ? ? ? ? ? ? iter.remove();
? ? ? ? ? ? ? ? if (key.isValid()) {
? ? ? ? ? ? ? ? ? if (key.isReadable()) {
? ? ? ? ? ? ? ? ? ??doRead(key);
? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? key = null;
? ? ? ? ? ? ? }
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? if (running) { ? ? ? ? ? ? ? ? ? ? ?// unexpected -- log it
? ? ? ? ? ? ? ? LOG.info(getName() + " caught: " +
? ? ? ? ? ? ? ? ? ? ? ? ?StringUtils.stringifyException(e));
? ? ? ? ? ? ? }
? ? ? ? ? ? } catch (IOException ex) {
? ? ? ? ? ? ? LOG.error("Error in Reader", ex);
? ? ? ? ? ? }
? ? ? ? ? }
? ? ? ? }
? ? ? }
? void doRead(SelectionKey key) throws InterruptedException {
? ? ? int count = 0;
? ? ??Connection c = (Connection)key.attachment();//獲取這個連接
? ? ? if (c == null) {
? ? ? ? return; ?
? ? ? }
? ? ? c.setLastContact(System.currentTimeMillis());
? ? ??
? ? ? try {
? ? ??? count = c.readAndProcess();//在這個函數(shù)中通過channel讀內(nèi)容 調(diào)用channelRead到Buffer里面讀數(shù)據(jù)
? ? ? } catch (InterruptedException ieo) {
? ? ? ? LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
? ? ? ? throw ieo;
? ? ? } catch (Exception e) {
? ? ? ? LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
? ? ? ? count = -1; //so that the (count < 0) block is executed
? ? ? }
? ? ? if (count < 0) {
? ? ? ? if (LOG.isDebugEnabled())
? ? ? ? ? LOG.debug(getName() + ": disconnecting client " +?
? ? ? ? ? ? ? ? ? ? c + ". Number of active connections: "+
? ? ? ? ? ? ? ? ? ? numConnections);
? ? ? ? closeConnection(c);
? ? ? ? c = null;
? ? ? }
? ? ? else {
? ? ? ? c.setLastContact(System.currentTimeMillis());
? ? ? }
? ? } ??
個人這樣理解:
傳統(tǒng)的socket過程是這樣的:在一個socket 上listen --> accept-->返回一個新的fd,這個fd轉(zhuǎn)給了別的線程,去讀、寫數(shù)據(jù)
JAVA NIO 是這樣的: 在一個socket上listen --> 用一個selector阻塞 accept -->將一個有channel信息的connection轉(zhuǎn)給reader,可以有很多reader,比如hadoop的reader pool -->每個reader有一個selector,selector負責阻塞等待感興趣的信息,等到后通過keys獲得channel的信息,然后通過channel讀內(nèi)容,將內(nèi)容轉(zhuǎn)換成call,給call queue ? ?后續(xù)處理。
所以client端的數(shù)據(jù)傳輸并不是通過server端listen的那個socket 而是和傳統(tǒng)的socket一樣,是通過listen accept后返回的那個socket。
才看一兩天 并不是特別確定這個結(jié)論
請大家指正,謝謝
======續(xù)========
其實JAVA NIO機制中,也可以在listener的selector部分 讀取客戶端的信息 也就是除了判斷信息是不是acceptable之外 也可以判斷channel是否readable、writable,但是這樣仍然會引發(fā)一個阻塞的問題,就是如果readable的信息遲遲不來,這樣仍然會阻塞listen線程 ?hadoop的優(yōu)化是 把讀取信息也異步化了 ?在reader里面讀 ?有一個reader pool。
NIO的selector負責的是監(jiān)聽和內(nèi)容分發(fā)
關(guān)于selector:
Selector與Channel之間的關(guān)聯(lián)由一個SelectionKey實例表示。(注意:一個信道可以注冊多個Selector實例,因此可以有多個關(guān)聯(lián)的SelectionKey實例)。?SelectionKey維護了一個信道上感興趣的操作類型信息,并將這些信息存放在一個int型的位圖中,該int型數(shù)據(jù)的每一位都有相應(yīng)的含義。SelectionKey類中的常量定義了信道上可能感興趣的操作類型,每個這種常量都是只有一位設(shè)置為1的位掩碼。
它在內(nèi)部可以同時管理多個I/O,當一個信道有I/O操作的時 候,他會通知Selector,Selector就是記住這個信道有I/O操作,并且知道是何種I/O操作,hadoop中,acceptable的操作在listener的selector中來監(jiān)聽,可讀的信道操作在reader pool中每個reader的selector來監(jiān)聽。
這樣,對客戶端請求的accept、信息的read ?就分散到幾個線程中了,一個listen線程,幾個read線程,這些線程來輪詢channel。
關(guān)于channel:
一個 Channel實例代表了一個“可輪詢的”I/O目標,如套接字(或一個文件、設(shè)備等)。
Channel能夠注冊一個Selector類的實例。 Selector的select()方法允許你詢問“在一組信道中,哪一個當前需要服務(wù)(即,被接收,讀或?qū)?#xff09;”,這兩個類都包含在 java.nio.channels包中。
一個 Selector實例可以同時檢查一組信道的I/O狀態(tài)。
Selector與Channel之間的關(guān)聯(lián)由一個SelectionKey實例表示。(注意:一個信道可以注冊多個Selector實例,因此可以有多個關(guān)聯(lián)的SelectionKey實例)。?SelectionKey維護了一個信道上感興趣的操作類型信息,并將這些信息存放在一個int型的位圖中,該int型數(shù)據(jù)的每一位都有相應(yīng)的含義。SelectionKey類中的常量定義了信道上可能感興趣的操作類型,每個這種常量都是只有一位設(shè)置為1的位掩碼。
上面的內(nèi)容部分摘自http://vaporz.blog.51cto.com/3142258/587229>
問題:selector和channel是不是多對多的
另外 JAVA NIO的另一個好處是 使用了Buffer 代替 stream ?使得性能可控 這個還需要再學(xué)習(xí)研究 具體沒有什么概念。
總結(jié)
以上是生活随笔為你收集整理的Hadoop DFS源码研究之---Hadoop RPC机制的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Hadoop与Hbase基本配置
- 下一篇: 配置HADOOP开发环境