Hadoop基于Protocol Buffer的RPC实现代码分析-Server端--转载
原文地址:http://yanbohappy.sinaapp.com/?p=110
最新版本的Hadoop代碼中已經默認了Protocol buffer(以下簡稱PB,http://code.google.com/p/protobuf/)作為RPC的默認實現,原來的WritableRpcEngine已經被淘汰了。來自cloudera的Aaron T. Myers在郵件中這樣說的“since PB can provide support for evolving protocols in a compatible fashion.”
首先要明白PB是什么,PB是Google開源的一種輕便高效的結構化數據存儲格式,可以用于結構化數據序列化/反序列化,很適合做數據存儲或 RPC 數據交換格式。它可用于通訊協議、數據存儲等領域的語言無關、平臺無關、可擴展的序列化結構數據格式。目前提供了 C++、Java、Python 三種語言的 API。簡單理解就是某個進程把一些結構化數據通過網絡通信的形式傳遞給另外一個進程(典型應用就是RPC);或者某個進程要把某些結構化數據持久化存儲到磁盤上(這個有點類似于在Mongodb中的BSON格式)。對于存儲的這個例子來說,使用PB和XML,JSON相比的缺點就是存儲在磁盤上的數據用戶是無法理解的,除非用PB反序列化之后才行,這個有點類似于IDL。優點就是序列化/反序列化速度快,網絡或者磁盤IO傳輸的數據少,這個在Data-Intensive Scalable Computing中是非常重要的。
Hadoop使用PB作為RPC實現的另外一個原因是PB的語言、平臺無關性。在mailing list里聽說過社區的人有這樣的考慮:就是現在每個MapReduce task都是在一個JVM虛擬機上運行的(即使是Streaming的模式,MR任務的數據流也是通過JVM與NN或者DN進行RPC交換的),JVM最嚴重的問題就是內存,例如OOM。我看社區里有人討論說如果用PB這樣的RPC實現,那么每個MR task都可以直接與NN或者DN進行RPC交換了,這樣就可以用C/C++來實現每一個MR task了。百度做的HCE(https://issues.apache.org/jira/browse/MAPREDUCE-1270)和這種思路有點類似,但是由于當時的Hadoop RPC通信還是通過WritableRpcEngine來實現的,所以MR task還是沒有擺脫通過本地的JVM代理與NN或者DN通信的束縛,因為Child JVM Process還是存在的,還是由它來設置運行時環境和RPC交互。
關于PB的原理和實現,請大家參考http://code.google.com/p/protobuf/或者http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/?ca=drs-tp4608,本文不再贅述。
下面來看看Hadoop代碼中的RPC是如何實現的。RPC就是一臺機器上的某個進程要調用另外一臺機器上的某個進程的方法,中間通信傳輸的就是類似于“方法名、參數1、參數2……”這樣的信息,是結構化的。同時通信除了這些RPC實體以外,還要有header等。
我們要定義一種PB實現的RPC傳輸格式,首先要定義相應的.proto文件,在Hadoop common工程里,這些文件放在D:\Hadoop-trunk\hadoop-common-project\hadoop-common\src\main\proto目錄下;在Hadoop HDFS工程里這些文件放在D:\Hadoop-trunk\hadoop-hdfs-project\hadoop-hdfs\src\main\proto目錄下,以此類推。Hadoop編譯腳本會調用相應的protoc二進制程序來編譯這些以.proto結尾的文件,生成相應的.java文件。
以D:\Hadoop-trunk\hadoop-hdfs-project\hadoop-hdfs\src\main\proto目錄下的ClientNamenodeProtocol.proto為例說明。文件最開始定義了一些參數:
option java_package = "org.apache.hadoop.hdfs.protocol.proto";option java_outer_classname = "ClientNamenodeProtocolProtos";option java_generic_services = true;option java_generate_equals_and_hash = true;這個表示這個.proto文件經過protoc編譯之后會生成org.apache.hadoop.hdfs.protocol.proto這個包下面的ClientNamenodeProtocolProtos.java類文件,那么在Hadoop源碼里就可以調用這個類里的方法了。
這個文件的主體主要是兩種數據類型message和rpc,仔細看下這個文件就知道了,message就是這個ClientNamenodeProtocol協議中傳輸的結構體,rpc就是調用的方法。那么這兩種類型在經過編譯之后會生成什么呢?
編譯之后,在Hadoop-trunk/hadoop-hdfs-project/hadoop-hdfs/target/generated-sources/java/org/apache/hadoop/hdfs/protocol/proto目錄里生成了ClientNamenodeProtocolProtos.java文件,里面把message都包裝成了類,而把rpc都包裝成了方法。這個文件是由PB編譯器自動生成的,所以不能修改。
有了這些java類之后,我們就可以看看在Server端是怎么實現RPC的了。首先還是NameNode初始化的流程,會調用到rpcServer = createRpcServer(conf)來創建RPC server。下面看看NameNodeRpcServer的構造函數里都做了哪些工作:
public NameNodeRpcServer(Configuration conf, NameNode nn)throws IOException {this.nn = nn;this.namesystem = nn.getNamesystem();this.metrics = NameNode.getNameNodeMetrics();int handlerCount =conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY,DFS_NAMENODE_HANDLER_COUNT_DEFAULT);InetSocketAddress socAddr = nn.getRpcServerAddress(conf);//設置ProtolEngine,目前只支持PB協議。表示接收到的RPC協議如果是ClientNamenodeProtocolPB,//那么處理這個RPC協議的引擎是ProtobufRpcEngineRPC.setProtocolEngine(conf,ClientNamenodeProtocolPB.class,ProtobufRpcEngine.class);//聲明一個ClientNamenodeProtocolServerSideTranslatorPB,//這個類負責把Server接收到的PB格式對象的數據,拼裝成NameNode內村中的數據類型,//調用NameNodeRpcServer類中相應的邏輯,然后再把執行結果拼裝成PB格式。 ClientNamenodeProtocolServerSideTranslatorPBclientProtocolServerTranslator =new ClientNamenodeProtocolServerSideTranslatorPB(this);BlockingService clientNNPbService = ClientNamenodeProtocol.newReflectiveBlockingService(clientProtocolServerTranslator);DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator =new DatanodeProtocolServerSideTranslatorPB(this);BlockingService dnProtoPbService = DatanodeProtocolService.newReflectiveBlockingService(dnProtoPbTranslator);NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =new NamenodeProtocolServerSideTranslatorPB(this);BlockingService NNPbService = NamenodeProtocolService.newReflectiveBlockingService(namenodeProtocolXlator);RefreshAuthorizationPolicyProtocolServerSideTranslatorPB refreshAuthPolicyXlator =new RefreshAuthorizationPolicyProtocolServerSideTranslatorPB(this);BlockingService refreshAuthService = RefreshAuthorizationPolicyProtocolService.newReflectiveBlockingService(refreshAuthPolicyXlator);RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator =new RefreshUserMappingsProtocolServerSideTranslatorPB(this);BlockingService refreshUserMappingService = RefreshUserMappingsProtocolService.newReflectiveBlockingService(refreshUserMappingXlator);GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator =new GetUserMappingsProtocolServerSideTranslatorPB(this);BlockingService getUserMappingService = GetUserMappingsProtocolService.newReflectiveBlockingService(getUserMappingXlator);HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =new HAServiceProtocolServerSideTranslatorPB(this);BlockingService haPbService = HAServiceProtocolService.newReflectiveBlockingService(haServiceProtocolXlator);WritableRpcEngine.ensureInitialized();InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf);if (dnSocketAddr != null) {int serviceHandlerCount =conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);// Add all the RPC protocols that the namenode implementsthis.serviceRpcServer =RPC.getServer(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class, clientNNPbService,dnSocketAddr.getHostName(), dnSocketAddr.getPort(),serviceHandlerCount,false, conf, namesystem.getDelegationTokenSecretManager());DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,serviceRpcServer);DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,serviceRpcServer);DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,serviceRpcServer);DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,refreshAuthService, serviceRpcServer);DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class,refreshUserMappingService, serviceRpcServer);DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,getUserMappingService, serviceRpcServer);this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();nn.setRpcServiceServerAddress(conf, serviceRPCAddress);} else {serviceRpcServer = null;serviceRPCAddress = null;}// Add all the RPC protocols that the namenode implementsthis.clientRpcServer = RPC.getServer(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class,clientNNPbService, socAddr.getHostName(),socAddr.getPort(), handlerCount, false, conf,namesystem.getDelegationTokenSecretManager());DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,clientRpcServer);DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,clientRpcServer);DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,clientRpcServer);DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,refreshAuthService, clientRpcServer);DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class,refreshUserMappingService, clientRpcServer);DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,getUserMappingService, clientRpcServer);// set service-level authorization security policyif (serviceAuthEnabled =conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {this.clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());if (this.serviceRpcServer != null) {this.serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());}}// The rpc-server port can be ephemeral... ensure we have the correct infothis.clientRpcAddress = this.clientRpcServer.getListenerAddress();nn.setRpcServerAddress(conf, clientRpcAddress);this.minimumDataNodeVersion = conf.get(DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY,DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT);}ClientNamenodeProtocol是protoc編譯生成的ClientNamenodeProtocolProtos類中的inner class。
public static com.google.protobuf.BlockingServicenewReflectiveBlockingService(final BlockingInterface impl) {……}這個方法也是由protoc編譯器自動生成的。這個方法會返回一個com.google.protobuf.BlockingService類型的對象,這種類型的對象定義了RPC的各種服務,后面會講。
this.clientRpcServer = RPC.getServer(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class,clientNNPbService, socAddr.getHostName(),socAddr.getPort(), handlerCount, false, conf,namesystem.getDelegationTokenSecretManager());這個RPC.getServer()函數生成一個Server對象,負責接收網絡連接,讀取數據,調用處理數據函數,返回結果。這個Server對象里有Listener, Handler, Responder內部類,分別開啟多個線程負責監聽、讀取、處理和返回結果。前兩個參數表示如果RPC發送過來的是ClientNamenodeProtocolPB協議,那么負責處理這個協議的服務(com.google.protobuf.BlockingService類型的對象)就是clientNNPbService。
這個RPC.getServer()會經過層層調用,因為現在默認的RPCEngine是ProtobufRpcEngine(ProtobufRpcEngine.java),就會調用到下面這個函數,在這生成了一個Server對象,就是用于接收client端RPC請求,處理,回復的Server。這個Server對象是一個純粹的網絡服務的Server,在RPC中起到基礎網絡IO服務的作用。
public RPC.Server getServer(Class<?> protocol, Object protocolImpl,String bindAddress, int port, int numHandlers, int numReaders,int queueSizePerHandler, boolean verbose, Configuration conf,SecretManager<? extends TokenIdentifier> secretManager,String portRangeConfig)throws IOException {return new Server(protocol, protocolImpl, conf, bindAddress, port,numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,portRangeConfig);}現在該用到的東西都生成好了,就要看看client端來了一個RPC請求之后,Server端是怎么處理的呢?
Server里的Reader線程也是基于Selector的異步IO模式,每次Select選出一個SelectionKey之后,會調用SelectionKey.attachment()把這個SelectionKey所attach的Connection對象獲取,然后執行對應的readAndProcess()方法,把這個SelectionKey所對應的管道上的網絡IO數據讀入緩沖區。readAndProcess()方法會層層調用到Server.processData()方法,在這個方法內部,會把剛才從網絡IO中讀取的數據反序列化成對象rpcRequest對象。rpcRequest對象的類型是繼承自Writable類型的子類的對象,也就是說可以序列化/反序列化的類。這里rpcRequest對象里包含的RPC請求的內容對象是由.proto文件中Message生成的類,也就是說PB框架自動編譯出來的類,后面可以通過調用這個類的get方法獲取RPC中真正傳輸的數據。之后把生成的rpcRequest對象放到一個Call對象里面,再把Call對象放到隊列Server.callQueue里面。至此網絡服務器的Reader線程做的工作就OK了。
下面看看Handler線程是怎么處理的。Handler線程默認有10個,所以處理邏輯是多線程的。每個Handler線程會從剛才提到的callQueue中取一個Call對象,然后調用Server.call()方法執行這個Call對象中蘊含的RPC請求。Server.call()->RPC.Server.call()->Server.getRpcInvoker()->ProtobufRpcInvoker.call()在最后這個call()函數里面真正執行嘍。。。。重點看這個函數,首先校驗這個請求發過來的數據是不是合理的。然后就是獲取實現這個協議的服務。實現協議的服務在初始化的時候已經注冊過了,就是前面說的那個com.google.protobuf.BlockingService類型的對象,例如:
BlockingService clientNNPbService = ClientNamenodeProtocol.newReflectiveBlockingService(clientProtocolServerTranslator);這個就是實現Client和NameNode之間的ClientNamenodeProtocol協議的服務。當然還有dnProtoPbService, NNPbService, refreshAuthService, refreshUserMappingService, haPbService等等這些不同的服務。
這個Service獲取了之后,通過調用這句代碼
result = service.callBlockingMethod(methodDescriptor, null, param);就會執行這個RPC請求的邏輯。
再往深入執行就要涉及到google protocol buffer內部的東西了,這個service對象會把相應的方法調用轉移到一個繼承自BlockingInterface接口的實現類上。Service的真正實現類就是clientProtocolServerTranslator,是newReflectiveBlockingService()這個函數的參數。
BlockingService clientNNPbService = ClientNamenodeProtocol.newReflectiveBlockingService(clientProtocolServerTranslator);這個初始化過程中的參數,也就是service.callBlockingMethod()真正調用的是clientProtocolServerTranslator中對應的方法。這一點可以通過由protoc自動編譯生成的代碼中看出:
public static com.google.protobuf.BlockingServicenewReflectiveBlockingService(final BlockingInterface impl) {return new com.google.protobuf.BlockingService() {public final com.google.protobuf.Descriptors.ServiceDescriptorgetDescriptorForType() {return getDescriptor();}public final com.google.protobuf.Message callBlockingMethod(com.google.protobuf.Descriptors.MethodDescriptor method,com.google.protobuf.RpcController controller,com.google.protobuf.Message request)throws com.google.protobuf.ServiceException {if (method.getService() != getDescriptor()) {throw new java.lang.IllegalArgumentException("Service.callBlockingMethod() given method descriptor for " +"wrong service type.");}switch(method.getIndex()) {case 0:return impl.getBlockLocations(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto)request);case 1:return impl.getServerDefaults(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto)request);case 2:return impl.create(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto)request);case 3:return impl.append(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto)request);…… } …… }上面就是proto編譯生成的ClientNamenodeProtocolProtos.java文件,從中可以看出對callBlockingMethod()方法的調用都是轉移到BlockingInterface impl上面了。
然后我們看看clientProtocolServerTranslator是怎么進一步執行的。下面以getBlockLocations()函數為例說明:
public GetBlockLocationsResponseProto getBlockLocations(RpcController controller, GetBlockLocationsRequestProto req)throws ServiceException {try {//下面這個server是由NameNodeRpcServer類生成的對象,定義了HDFS元數據操作邏輯。LocatedBlocks b = server.getBlockLocations(req.getSrc(), req.getOffset(),req.getLength());//由于server返回的是NameNode內存中的數據結構,要把這個結果通過RPC傳回client端,//那么我們需要利用PB框架提供的對應Message的Builder類,把內存中的數據結構通過這個接口序列化。Builder builder = GetBlockLocationsResponseProto.newBuilder();if (b != null) {builder.setLocations(PBHelper.convert(b)).build();}return builder.build();} catch (IOException e) {throw new ServiceException(e);}}至此,Hadoop的RPC流程Server端已經分析結束,不過這個是正確執行的流程。如果中間拋出了異常呢?還是以上面這個getBlockLocations()函數為例,如果元數據操作邏輯NameNodeRpcServer里面拋出IOException,那么它都會把它封裝成ServiceException,然后一路傳遞給client端。在client端,會通過ProtobufHelper.getRemoteException()把封裝在ServiceException中的IOException獲取出來。
?
轉載于:https://www.cnblogs.com/davidwang456/p/4773914.html
總結
以上是生活随笔為你收集整理的Hadoop基于Protocol Buffer的RPC实现代码分析-Server端--转载的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: hdfs源码分析第一弹
- 下一篇: HDFS集中式的缓存管理原理与代码剖析-